diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 7509b6d6b9cf7..166afc97a6c12 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -19,15 +19,18 @@ package org.apache.spark.deploy.history import org.apache.spark.ui.SparkUI -private[history] case class ApplicationHistoryInfo( - id: String, +private[history] case class ApplicationAttemptInfo( + attemptId: String, name: String, startTime: Long, endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = false, - appAttemptId: String = "") + completed: Boolean = false) + +private[history] case class ApplicationHistoryInfo( + id: String, + attempts: List[ApplicationAttemptInfo]) private[history] abstract class ApplicationHistoryProvider { @@ -42,9 +45,11 @@ private[history] abstract class ApplicationHistoryProvider { * Returns the Spark UI for a specific application. * * @param appId The application ID. + * @param attemptId The application attempt ID for apps with multiple attempts (or an empty + * string for apps with a single attempt). * @return The application's UI, or None if application is not found. */ - def getAppUI(appId: String): Option[SparkUI] + def getAppUI(appId: String, attemptId: String): Option[SparkUI] /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 7e31a299ae925..2f48a20f1e5a6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -143,31 +143,34 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values - override def getAppUI(appId: String): Option[SparkUI] = { + override def getAppUI(appId: String, attemptId: String): Option[SparkUI] = { try { - applications.get(appId).map { info => - val replayBus = new ReplayListenerBus() - val ui = { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, - s"${HistoryServer.UI_PATH_PREFIX}/$appId") - // Do not call ui.bind() to avoid creating a new server for each application - } + applications.get(appId).flatMap { info => + val attempts = info.attempts.filter(_.attemptId == attemptId) + attempts.headOption.map { attempt => + val replayBus = new ReplayListenerBus() + val ui = { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, + s"${HistoryServer.UI_PATH_PREFIX}/$appId") + // Do not call ui.bind() to avoid creating a new server for each application + } - val appListener = new ApplicationEventListener() - replayBus.addListener(appListener) - val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus) + val appListener = new ApplicationEventListener() + replayBus.addListener(appListener) + val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) - ui.setAppName(s"${appInfo.name} ($appId)") + ui.setAppName(s"${attempt.name} ($appId)") - val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - ui.getSecurityManager.setAcls(uiAclsEnabled) - // make sure to set admin acls before view acls so they are properly picked up - ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) - ui.getSecurityManager.setViewAcls(appInfo.sparkUser, - appListener.viewAcls.getOrElse("")) - ui + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setAcls(uiAclsEnabled) + // make sure to set admin acls before view acls so they are properly picked up + ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) + ui.getSecurityManager.setViewAcls(attempt.sparkUser, + appListener.viewAcls.getOrElse("")) + ui + } } } catch { case e: FileNotFoundException => None @@ -225,7 +228,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = { val bus = new ReplayListenerBus() - val newApps = logs.flatMap { fileStatus => + val newAttempts = logs.flatMap { fileStatus => try { val res = replay(fileStatus, bus) logInfo(s"Application log ${res.logPath} loaded successfully.") @@ -237,41 +240,52 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis e) None } - }.toSeq.sortWith(compareAppInfo) - - // When there are new logs, merge the new list with the existing one, maintaining - // the expected ordering (descending end time). Maintaining the order is important - // to avoid having to sort the list every time there is a request for the log list. - if (newApps.nonEmpty) { - val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { - if (!mergedApps.contains(info.id) || - mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && - !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { - val key = - if (info.appAttemptId.equals("")) { - info.id - } else { - info.id + "_" + info.appAttemptId - } - mergedApps += (key -> info) - } - } + } - val newIterator = newApps.iterator.buffered - val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { - if (compareAppInfo(newIterator.head, oldIterator.head)) { - addIfAbsent(newIterator.next()) - } else { - addIfAbsent(oldIterator.next()) + if (newAttempts.isEmpty) { + return + } + + // Build a map containing all apps that contain new attempts. The app information in this map + // contains both the new app attempt, and those that were already loaded in the existing apps + // map. If an attempt has been updated, it replaces the old attempt in the list. + val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]() + newAttempts.foreach { attempt => + val appInfo = applications.get(attempt.appId) + .orElse(newAppMap.get(attempt.appId)) + .map { app => + val attempts = + app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt) + new FsApplicationHistoryInfo(attempt.appId, attempts.sortWith(compareAttemptInfo)) } + .getOrElse(new FsApplicationHistoryInfo(attempt.appId, List(attempt))) + newAppMap(attempt.appId) = appInfo + } + + // Merge the new app list with the existing one, maintaining the expected ordering (descending + // end time). Maintaining the order is important to avoid having to sort the list every time + // there is a request for the log list. + val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo) + val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { + if (!mergedApps.contains(info.id)) { + mergedApps += (info.id -> info) } - newIterator.foreach(addIfAbsent) - oldIterator.foreach(addIfAbsent) + } - applications = mergedApps + val newIterator = newApps.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext && oldIterator.hasNext) { + if (compareAppInfo(newIterator.head, oldIterator.head)) { + addIfAbsent(newIterator.next()) + } else { + addIfAbsent(oldIterator.next()) + } } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = mergedApps } /** @@ -288,7 +302,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() applications.values.foreach { info => - if (now - info.lastUpdated <= maxAge) { + if (now - info.attempts.head.lastUpdated <= maxAge) { appsToRetain += (info.id -> info) } } @@ -321,6 +335,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def compareAppInfo( i1: FsApplicationHistoryInfo, i2: FsApplicationHistoryInfo): Boolean = { + compareAttemptInfo(i1.attempts.head, i2.attempts.head) + } + + private def compareAttemptInfo( + i1: FsApplicationAttemptInfo, + i2: FsApplicationAttemptInfo): Boolean = { if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime } @@ -328,7 +348,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * Replays the events in the specified log file and returns information about the associated * application. */ - private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = { + private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") val logInput = @@ -341,16 +361,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val appListener = new ApplicationEventListener bus.addListener(appListener) bus.replay(logInput, logPath.toString) - new FsApplicationHistoryInfo( + new FsApplicationAttemptInfo( logPath.getName(), appListener.appId.getOrElse(logPath.getName()), + appListener.appAttemptId.getOrElse(""), appListener.appName.getOrElse(NOT_STARTED), appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), getModificationTime(eventLog).get, appListener.sparkUser.getOrElse(NOT_STARTED), - isApplicationCompleted(eventLog), - appListener.appAttemptId.getOrElse("")) + isApplicationCompleted(eventLog)) } finally { logInput.close() } @@ -437,15 +457,20 @@ private object FsHistoryProvider { val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds } -private class FsApplicationHistoryInfo( +private class FsApplicationAttemptInfo( val logPath: String, - id: String, + val appId: String, + attemptId: String, name: String, startTime: Long, endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = true, - appAttemptId: String ="") - extends ApplicationHistoryInfo( - id, name, startTime, endTime, lastUpdated, sparkUser, completed, appAttemptId) + completed: Boolean = true) + extends ApplicationAttemptInfo( + attemptId, name, startTime, endTime, lastUpdated, sparkUser, completed) + +private class FsApplicationHistoryInfo( + id: String, + override val attempts: List[FsApplicationAttemptInfo]) + extends ApplicationHistoryInfo(id, attempts) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 87159715b34e8..e51256c40122b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -37,18 +37,13 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedIncomplete = Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean - val allCompletedAppsNAttempts = - parent.getApplicationList().filter(_.completed != requestedIncomplete) - val (hasAttemptInfo, appToAttemptMap) = getApplicationLevelList(allCompletedAppsNAttempts) - - val allAppsSize = allCompletedAppsNAttempts.size - + val allApps = parent.getApplicationList() + .filter(_.attempts.exists(_.completed != requestedIncomplete)) + val allAppsSize = allApps.size + val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 - val apps = - allCompletedAppsNAttempts.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) - val appWithAttemptsDisplayList = - appToAttemptMap.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) - + val appsToShow = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize)) + val actualPage = (actualFirst / pageSize) + 1 val last = Math.min(actualFirst + pageSize, allAppsSize) - 1 val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0) @@ -56,12 +51,14 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val secondPageFromLeft = 2 val secondPageFromRight = pageCount - 1 - val appTable = - if (hasAttemptInfo) { - UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appWithAttemptsDisplayList) + val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1) + val appTable = + if (hasMultipleAttempts) { + UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow) } else { - UIUtils.listingTable(appHeader, appRow, apps) + UIUtils.listingTable(appHeader, appRow, appsToShow) } + val providerConfig = parent.getProviderConfig() val content =
@@ -129,36 +126,6 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
UIUtils.basicSparkPage(content, "History Server") } - - private def getApplicationLevelList (appNattemptList: Iterable[ApplicationHistoryInfo]) ={ - // Create HashMap as per the multiple attempts for one application. - // If there is no attempt specific stuff, then - // do return false, to indicate the same, so that previous UI gets displayed. - var hasAttemptInfo = false - val appToAttemptInfo = new HashMap[String, ArrayBuffer[ApplicationHistoryInfo]] - for( appAttempt <- appNattemptList) { - if(!appAttempt.appAttemptId.equals("")){ - hasAttemptInfo = true - val attemptId = appAttempt.appAttemptId.toInt - if(appToAttemptInfo.contains(appAttempt.id)){ - val currentAttempts = appToAttemptInfo.get(appAttempt.id).get - currentAttempts += appAttempt - appToAttemptInfo.put( appAttempt.id, currentAttempts) - } else { - val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]() - currentAttempts += appAttempt - appToAttemptInfo.put( appAttempt.id, currentAttempts ) - } - }else { - val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]() - currentAttempts += appAttempt - appToAttemptInfo.put(appAttempt.id, currentAttempts) - } - } - val sortedMap = ListMap(appToAttemptInfo.toSeq.sortWith(_._1 > _._1):_*) - (hasAttemptInfo, sortedMap) - } - private val appHeader = Seq( "App ID", @@ -169,12 +136,6 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") "Spark User", "Last Updated") - private def rangeIndices(range: Seq[Int], condition: Int => Boolean, showIncomplete: Boolean): - Seq[Node] = { - range.filter(condition).map(nextPage => - {nextPage} ) - } - private val appWithAttemptHeader = Seq( "App ID", "App Name", @@ -185,85 +146,70 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") "Spark User", "Last Updated") - private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" - val startTime = UIUtils.formatDate(info.startTime) - val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-" - val duration = - if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-" - val lastUpdated = UIUtils.formatDate(info.lastUpdated) - - {info.id} - {info.name} - {startTime} - {endTime} - {duration} - {info.sparkUser} - {lastUpdated} - + private def rangeIndices( + range: Seq[Int], + condition: Int => Boolean, + showIncomplete: Boolean): Seq[Node] = { + range.filter(condition).map(nextPage => + {nextPage} ) } - - private def getAttemptURI(attemptInfo: ApplicationHistoryInfo, - returnEmptyIfAttemptInfoNull: Boolean = true ) = { - if (attemptInfo.appAttemptId.equals("")) { - if(returnEmptyIfAttemptInfoNull) { - attemptInfo.appAttemptId - } else { - HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" - } - } else { - HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}" - } + + private def getAttemptURI(appId: String, attemptInfo: ApplicationAttemptInfo): String = { + val attemptSuffix = if (!attemptInfo.attemptId.isEmpty) s"/${attemptInfo.attemptId}" else "" + s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}" } - - private def firstAttemptRow(attemptInfo : ApplicationHistoryInfo) = { - val uiAddress = - if (attemptInfo.appAttemptId.equals("")) { - attemptInfo.appAttemptId - } else { - HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}" - } - + + private def attemptRow( + info: ApplicationHistoryInfo, + attempt: ApplicationAttemptInfo, + isFirst: Boolean): Seq[Node] = { + val attemptInfo = info.attempts.head + val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" val startTime = UIUtils.formatDate(attemptInfo.startTime) - val endTime = UIUtils.formatDate(attemptInfo.endTime) - val duration = UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime) + val endTime = if (attemptInfo.endTime > 0) UIUtils.formatDate(attemptInfo.endTime) else "-" + val duration = + if (attemptInfo.endTime > 0) { + UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime) + } else { + "-" + } val lastUpdated = UIUtils.formatDate(attemptInfo.lastUpdated) - val attemptId = attemptInfo.appAttemptId - {attemptId} - {startTime} - {endTime} - - {duration} - {attemptInfo.sparkUser} - {lastUpdated} - } - - private def attemptRow(attemptInfo: ApplicationHistoryInfo) = { - {firstAttemptRow(attemptInfo)} + { + if (isFirst) { + if (info.attempts.size > 1) { + {info.id} + } else { + {info.id} + } + } else { + new xml.Comment("") + } + } + { + if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) { + {attempt.attemptId} + } else { + Nil + } + } + {attempt.name} + {startTime} + {endTime} + + {duration} + {attempt.sparkUser} + {lastUpdated} } - - private def appWithAttemptRow( - appAttemptsInfo: (String,ArrayBuffer[ApplicationHistoryInfo])): Seq[Node] = { - val applicationId = appAttemptsInfo._1 - val info = appAttemptsInfo._2 - val rowSpan = info.length - val rowSpanString = rowSpan.toString - val applicatioName = info(0).name - val lastAttemptURI = getAttemptURI(info(0), false) - val ttAttempts = info.slice(1, rowSpan -1) - val x = new xml.NodeBuffer - x += - - {applicationId} - {applicatioName} - { firstAttemptRow(info(0)) } - ; - for( i <- 1 until rowSpan ){ - x += attemptRow(info(i)) - } - x + + private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { + attemptRow(info, info.attempts.head, true) + } + + private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = { + attemptRow(info, info.attempts.head, true) ++ + info.attempts.drop(1).flatMap(attemptRow(info, _, false)) } private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 72f6048239297..b7b4355351619 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -52,7 +52,11 @@ class HistoryServer( private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { - val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException()) + val parts = key.split("/") + require(parts.length == 1 || parts.length == 2, s"Invalid app key $key") + val ui = provider + .getAppUI(parts(0), if (parts.length > 1) parts(1) else "") + .getOrElse(throw new NoSuchElementException()) attachSparkUI(ui) ui } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index fcae603c7d18e..a75ade3cc6f18 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -50,7 +50,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers inProgress: Boolean, codec: Option[String] = None): File = { val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" - val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId) + val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, "") val logPath = new URI(logUri).getPath + ip new File(logPath) } @@ -106,22 +106,28 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers val list = provider.getListing().toSeq list should not be (null) list.size should be (5) - list.count(_.completed) should be (3) + list.count(_.attempts.head.completed) should be (3) - list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L, + def makeAppInfo(id: String, name: String, start: Long, end: Long, lastMod: Long, + user: String, completed: Boolean): ApplicationHistoryInfo = { + ApplicationHistoryInfo(id, + List(ApplicationAttemptInfo("", name, start, end, lastMod, user, completed))) + } + + list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L, newAppComplete.lastModified(), "test", true)) - list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(), + list(1) should be (makeAppInfo(newAppCompressedComplete.getName(), "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) - list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L, + list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L, oldAppComplete.lastModified(), "test", true)) - list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, - -1L, oldAppIncomplete.lastModified(), "test", false)) - list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, - -1L, newAppIncomplete.lastModified(), "test", false)) + list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L, + oldAppIncomplete.lastModified(), "test", false)) + list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L, + newAppIncomplete.lastModified(), "test", false)) // Make sure the UI can be rendered. list.foreach { case info => - val appUi = provider.getAppUI(info.id) + val appUi = provider.getAppUI(info.id, "") appUi should not be null } } @@ -190,13 +196,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers provider.checkForLogs() val appListBeforeRename = provider.getListing() appListBeforeRename.size should be (1) - appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS) + appListBeforeRename.head.attempts.head.logPath should endWith(EventLoggingListener.IN_PROGRESS) logFile1.renameTo(newLogFile("app1", inProgress = false)) provider.checkForLogs() val appListAfterRename = provider.getListing() appListAfterRename.size should be (1) - appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS) + appListAfterRename.head.attempts.head.logPath should not + endWith(EventLoggingListener.IN_PROGRESS) } test("SPARK-5582: empty log directory") { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 3a9963a5ce7b7..b750a9e49d1be 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -36,7 +36,8 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar { val request = mock[HttpServletRequest] val ui = mock[SparkUI] val link = "/history/app1" - val info = new ApplicationHistoryInfo("app1", "app1", 0, 2, 1, "xxx", true) + val info = new ApplicationHistoryInfo("app1", + List(ApplicationAttemptInfo("attempt1", "app1", 0, 2, 1, "xxx", true))) when(historyServer.getApplicationList()).thenReturn(Seq(info)) when(ui.basePath).thenReturn(link) when(historyServer.getProviderConfig()).thenReturn(Map[String, String]()) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 30ee63e78d9d8..2c3efd8b71e2a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef } test("Log overwriting") { - val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test") + val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", "") val logPath = new URI(logUri).getPath // Create file before writing the event log new FileOutputStream(new File(logPath)).close() @@ -108,18 +108,18 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef test("Event log name") { // without compression assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath( - Utils.resolveURI("/base-dir"), "app1")) + Utils.resolveURI("/base-dir"), "app1", "")) // with compression assert(s"file:/base-dir/app1.lzf" === - EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf"))) + EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", "", Some("lzf"))) // illegal characters in app ID assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" === EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), - "a fine:mind$dollar{bills}.1")) + "a fine:mind$dollar{bills}.1", "")) // illegal characters in app ID with compression assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" === EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), - "a fine:mind$dollar{bills}.1", Some("lz4"))) + "a fine:mind$dollar{bills}.1", "", Some("lz4"))) } /* ----------------- * @@ -186,7 +186,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef val eventLogPath = eventLogger.logPath val expectedLogDir = testDir.toURI() assert(eventLogPath === EventLoggingListener.getLogPath( - expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName))) + expectedLogDir, sc.applicationId, "", compressionCodec.map(CompressionCodec.getShortName))) // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger)