diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cee07c3116a5a..3ed1f8157e496 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1757,7 +1757,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser)) + startTime, sparkUser, applicationAttemptId)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index ea6c85ee511d5..7509b6d6b9cf7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -26,7 +26,8 @@ private[history] case class ApplicationHistoryInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = false) + completed: Boolean = false, + appAttemptId: String = "") private[history] abstract class ApplicationHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 9d40d8c8fd7a8..7e31a299ae925 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -248,7 +248,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis if (!mergedApps.contains(info.id) || mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { - mergedApps += (info.id -> info) + val key = + if (info.appAttemptId.equals("")) { + info.id + } else { + info.id + "_" + info.appAttemptId + } + mergedApps += (key -> info) } } @@ -343,7 +349,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis appListener.endTime.getOrElse(-1L), getModificationTime(eventLog).get, appListener.sparkUser.getOrElse(NOT_STARTED), - isApplicationCompleted(eventLog)) + isApplicationCompleted(eventLog), + appListener.appAttemptId.getOrElse("")) } finally { logInput.close() } @@ -438,5 +445,7 @@ private class FsApplicationHistoryInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = true) - extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed) + completed: Boolean = true, + appAttemptId: String ="") + extends ApplicationHistoryInfo( + id, name, startTime, endTime, lastUpdated, sparkUser, completed, appAttemptId) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 6e432d63c6b5a..87159715b34e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -22,6 +22,9 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{WebUIPage, UIUtils} +import scala.collection.immutable.ListMap +import scala.collection.mutable.HashMap +import scala.collection.mutable.ArrayBuffer private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { @@ -34,18 +37,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedIncomplete = Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean - val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete) - val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 - val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) - + val allCompletedAppsNAttempts = + parent.getApplicationList().filter(_.completed != requestedIncomplete) + val (hasAttemptInfo, appToAttemptMap) = getApplicationLevelList(allCompletedAppsNAttempts) + + val allAppsSize = allCompletedAppsNAttempts.size + + val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 + val apps = + allCompletedAppsNAttempts.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) + val appWithAttemptsDisplayList = + appToAttemptMap.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) + val actualPage = (actualFirst / pageSize) + 1 - val last = Math.min(actualFirst + pageSize, allApps.size) - 1 - val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) + val last = Math.min(actualFirst + pageSize, allAppsSize) - 1 + val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0) val secondPageFromLeft = 2 val secondPageFromRight = pageCount - 1 - val appTable = UIUtils.listingTable(appHeader, appRow, apps) + val appTable = + if (hasAttemptInfo) { + UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appWithAttemptsDisplayList) + } else { + UIUtils.listingTable(appHeader, appRow, apps) + } val providerConfig = parent.getProviderConfig() val content =
@@ -59,7 +75,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") // to the first and last page. If the current page +/- `plusOrMinus` is greater // than the 2nd page from the first page or less than the 2nd page from the last // page, `...` will be displayed. - if (allApps.size > 0) { + if (allAppsSize > 0) { val leftSideIndices = rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) val rightSideIndices = @@ -67,7 +83,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") requestedIncomplete)

- Showing {actualFirst + 1}-{last + 1} of {allApps.size} + Showing {actualFirst + 1}-{last + 1} of {allAppsSize} {if (requestedIncomplete) "(Incomplete applications)"} { @@ -113,6 +129,36 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")

UIUtils.basicSparkPage(content, "History Server") } + + private def getApplicationLevelList (appNattemptList: Iterable[ApplicationHistoryInfo]) ={ + // Create HashMap as per the multiple attempts for one application. + // If there is no attempt specific stuff, then + // do return false, to indicate the same, so that previous UI gets displayed. + var hasAttemptInfo = false + val appToAttemptInfo = new HashMap[String, ArrayBuffer[ApplicationHistoryInfo]] + for( appAttempt <- appNattemptList) { + if(!appAttempt.appAttemptId.equals("")){ + hasAttemptInfo = true + val attemptId = appAttempt.appAttemptId.toInt + if(appToAttemptInfo.contains(appAttempt.id)){ + val currentAttempts = appToAttemptInfo.get(appAttempt.id).get + currentAttempts += appAttempt + appToAttemptInfo.put( appAttempt.id, currentAttempts) + } else { + val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]() + currentAttempts += appAttempt + appToAttemptInfo.put( appAttempt.id, currentAttempts ) + } + }else { + val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]() + currentAttempts += appAttempt + appToAttemptInfo.put(appAttempt.id, currentAttempts) + } + } + val sortedMap = ListMap(appToAttemptInfo.toSeq.sortWith(_._1 > _._1):_*) + (hasAttemptInfo, sortedMap) + } + private val appHeader = Seq( "App ID", @@ -128,6 +174,16 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") range.filter(condition).map(nextPage => {nextPage} ) } + + private val appWithAttemptHeader = Seq( + "App ID", + "App Name", + "Attempt ID", + "Started", + "Completed", + "Duration", + "Spark User", + "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" @@ -146,6 +202,69 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {lastUpdated} } + + private def getAttemptURI(attemptInfo: ApplicationHistoryInfo, + returnEmptyIfAttemptInfoNull: Boolean = true ) = { + if (attemptInfo.appAttemptId.equals("")) { + if(returnEmptyIfAttemptInfoNull) { + attemptInfo.appAttemptId + } else { + HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + } + } else { + HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}" + } + } + + private def firstAttemptRow(attemptInfo : ApplicationHistoryInfo) = { + val uiAddress = + if (attemptInfo.appAttemptId.equals("")) { + attemptInfo.appAttemptId + } else { + HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}" + } + + val startTime = UIUtils.formatDate(attemptInfo.startTime) + val endTime = UIUtils.formatDate(attemptInfo.endTime) + val duration = UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime) + val lastUpdated = UIUtils.formatDate(attemptInfo.lastUpdated) + val attemptId = attemptInfo.appAttemptId + {attemptId} + {startTime} + {endTime} + + {duration} + {attemptInfo.sparkUser} + {lastUpdated} + } + + private def attemptRow(attemptInfo: ApplicationHistoryInfo) = { + + {firstAttemptRow(attemptInfo)} + + } + + private def appWithAttemptRow( + appAttemptsInfo: (String,ArrayBuffer[ApplicationHistoryInfo])): Seq[Node] = { + val applicationId = appAttemptsInfo._1 + val info = appAttemptsInfo._2 + val rowSpan = info.length + val rowSpanString = rowSpan.toString + val applicatioName = info(0).name + val lastAttemptURI = getAttemptURI(info(0), false) + val ttAttempts = info.slice(1, rowSpan -1) + val x = new xml.NodeBuffer + x += + + {applicationId} + {applicatioName} + { firstAttemptRow(info(0)) } + ; + for( i <- 1 until rowSpan ){ + x += attemptRow(info(i)) + } + x + } private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { "/?" + Array( diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index 6d39a5e3fa64c..a591c7e046d5d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -26,6 +26,7 @@ package org.apache.spark.scheduler private[spark] class ApplicationEventListener extends SparkListener { var appName: Option[String] = None var appId: Option[String] = None + var appAttemptId: Option[String] = None var sparkUser: Option[String] = None var startTime: Option[Long] = None var endTime: Option[Long] = None @@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { appName = Some(applicationStart.appName) appId = applicationStart.appId + appAttemptId = Some(applicationStart.appAttemptId) startTime = Some(applicationStart.time) sparkUser = Some(applicationStart.sparkUser) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 02a2263940821..49826a4ae2377 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -266,12 +266,11 @@ private[spark] object EventLoggingListener extends Logging { appAttemptId: String, compressionCodecName: Option[String] = None): String = { val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase - - if (appAttemptId.equals("")) { - Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") - } else { - Utils.resolveURI(logBaseDir) + "/" + appAttemptId + "/" + name.stripSuffix("/") - } + if (appAttemptId.equals("")) { + logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/") + } else { + logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/") + "_" + appAttemptId + } } def getLogPath( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index b711ff209af94..9f526b08a7b42 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate( extends SparkListenerEvent @DeveloperApi -case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long, - sparkUser: String) extends SparkListenerEvent +case class SparkListenerApplicationStart(appName: String, appId: Option[String], + time: Long, sparkUser: String, appAttemptId: String = "") extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 474f79fb756f6..9dbf466278eb0 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -194,7 +194,8 @@ private[spark] object JsonProtocol { ("App Name" -> applicationStart.appName) ~ ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ - ("User" -> applicationStart.sparkUser) + ("User" -> applicationStart.sparkUser) ~ + ("appAttemptId" -> applicationStart.appAttemptId) } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { @@ -562,7 +563,8 @@ private[spark] object JsonProtocol { val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String]) val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] - SparkListenerApplicationStart(appName, appId, time, sparkUser) + val appAttemptId = (json \ "appAttemptId").extract[String] + SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {