diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index e51256c40122b..b6da1e09444fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -160,6 +160,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
}
private def attemptRow(
+ renderAttemptIdColumn: Boolean,
info: ApplicationHistoryInfo,
attempt: ApplicationAttemptInfo,
isFirst: Boolean): Seq[Node] = {
@@ -178,22 +179,27 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
{
if (isFirst) {
if (info.attempts.size > 1) {
-
{info.id} |
+ {info.id} | ++
+ {attempt.name} |
} else {
- {info.id} |
+ {info.id} | ++
+ {attempt.name} |
}
} else {
- new xml.Comment("")
+ Nil
}
}
{
- if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) {
- {attempt.attemptId} |
+ if (renderAttemptIdColumn) {
+ if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) {
+ {attempt.attemptId} |
+ } else {
+ |
+ }
} else {
Nil
}
}
- {attempt.name} |
{startTime} |
{endTime} |
@@ -204,12 +210,12 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
}
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- attemptRow(info, info.attempts.head, true)
+ attemptRow(false, info, info.attempts.head, true)
}
private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = {
- attemptRow(info, info.attempts.head, true) ++
- info.attempts.drop(1).flatMap(attemptRow(info, _, false))
+ attemptRow(true, info, info.attempts.head, true) ++
+ info.attempts.drop(1).flatMap(attemptRow(true, info, _, false))
}
private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index b7b4355351619..0761d2846b854 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -80,18 +80,23 @@ class HistoryServer(
return
}
- val appId = parts(1)
+ val appKey =
+ if (parts.length == 3) {
+ s"${parts(1)}/${parts(2)}"
+ } else {
+ parts(1)
+ }
// Note we don't use the UI retrieved from the cache; the cache loader above will register
// the app's UI, and all we need to do is redirect the user to the same URI that was
// requested, and the proper data should be served at that point.
try {
- appCache.get(appId)
+ appCache.get(appKey)
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
} catch {
case e: Exception => e.getCause() match {
case nsee: NoSuchElementException =>
- val msg = Application {appId} not found.
+ val msg = Application {appKey} not found.
res.setStatus(HttpServletResponse.SC_NOT_FOUND)
UIUtils.basicSparkPage(msg, "Not Found").foreach(
n => res.getWriter().write(n.toString))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index c75c7a53afefa..238694324c19f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -55,8 +55,8 @@ private[spark] class EventLoggingListener(
import EventLoggingListener._
- def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
- this(appId, "", logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+ def this(appId: String, appAttemptId : String, logBaseDir: URI, sparkConf: SparkConf) =
+ this(appId, appAttemptId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
@@ -253,9 +253,12 @@ private[spark] object EventLoggingListener extends Logging {
* we won't know which codec to use to decompress the metadata needed to open the file in
* the first place.
*
+ * The log file name will identify the compression codec used for the contents, if any.
+ * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log.
+ *
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
- * @param appAttemptId A unique attempt id of appId.
+ * @param appAttemptId A unique attempt id of appId. May be the empty string.
* @param compressionCodecName Name to identify the codec used to compress the contents
* of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
@@ -265,14 +268,20 @@ private[spark] object EventLoggingListener extends Logging {
appId: String,
appAttemptId: String,
compressionCodecName: Option[String] = None): String = {
- val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
- if (appAttemptId.equals("")) {
- logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/")
+ val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
+ val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+ val codec = compressionCodecName.map("." + _).getOrElse("")
+ if (appAttemptId.isEmpty) {
+ base + codec
} else {
- logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/") + "_" + appAttemptId
+ base + "_" + sanitize(appAttemptId) + codec
}
}
+ private def sanitize(str: String): String = {
+ str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
+ }
+
/**
* Opens an event log file and returns an input stream that contains the event data.
*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index d612409c81b6f..9a64629eeb885 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,14 +73,14 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId
-
+
/**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
/**
- * Get an application's attempt Id associated with the job.
+ * Get an application's attempt ID associated with the job.
*
* @return An application's Attempt ID
*/
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 23c18edf70b25..0a877a78714b2 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -195,7 +195,7 @@ private[spark] object JsonProtocol {
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser) ~
- ("appAttemptId" -> applicationStart.appAttemptId)
+ ("appAttemptId" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
}
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index e9b59a0041f52..1d6c5c868e8f4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
- val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener("test", "", testDirPath.toUri(), conf)
eventLogger.start()
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -140,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
- val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
+ val eventLogger = new EventLoggingListener(logName, "", testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 3cc2a9f5570b1..e14a70d353a3f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -91,10 +91,9 @@ private[spark] class ApplicationMaster(
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
- // Propagate the attempt if, so that in case of event logging,
- // different attempt's logs gets created in different directory
- System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())
-
+ // Propagate the attempt if, so that in case of event logging,
+ // different attempt's logs gets created in different directory
+ System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())
}
logInfo("ApplicationAttemptId: " + appAttemptId)
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index f20f4dcb00d64..f022e8296fa66 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -46,7 +46,7 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application ID is not set.")
super.applicationId
}
-
+
override def applicationAttemptId(): String =
// In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set
// before user application is launched.
|