Skip to content

Commit

Permalink
[SPARK-20355] Add per application spark version on the history server…
Browse files Browse the repository at this point in the history
… headerpage

## What changes were proposed in this pull request?

Spark Version for a specific application is not displayed on the history page now. It should be nice to switch the spark version on the UI when we click on the specific application.
Currently there seems to be way as SparkListenerLogStart records the application version. So, it should be trivial to listen to this event and provision this change on the UI.
For Example
<img width="1439" alt="screen shot 2017-04-06 at 3 23 41 pm" src="https://cloud.githubusercontent.com/assets/8295799/25092650/41f3970a-2354-11e7-9b0d-4646d0adeb61.png">
<img width="1399" alt="screen shot 2017-04-17 at 9 59 33 am" src="https://cloud.githubusercontent.com/assets/8295799/25092743/9f9e2f28-2354-11e7-9605-f2f1c63f21fe.png">

{"Event":"SparkListenerLogStart","Spark Version":"2.0.0"}
(Please fill in changes proposed in this fix)
Modified the SparkUI for History server to listen to SparkLogListenerStart event and extract the version and print it.

## How was this patch tested?
Manual testing of UI page. Attaching the UI screenshot changes here

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Sanket <schintap@untilservice-lm>

Closes #17658 from redsanket/SPARK-20355.
  • Loading branch information
Sanket authored and Tom Graves committed May 9, 2017
1 parent 714811d commit 181261a
Show file tree
Hide file tree
Showing 25 changed files with 107 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ private[spark] case class ApplicationAttemptInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)
completed: Boolean = false,
appSparkVersion: String)

private[spark] case class ApplicationHistoryInfo(
id: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name,
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
HistoryServer.getAttemptURI(appId, attempt.attemptId),
attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}

Expand All @@ -257,6 +258,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)

if (appListener.appId.isDefined) {
ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
// make sure to set admin acls before view acls so they are properly picked up
val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("")
Expand Down Expand Up @@ -443,7 +445,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = try {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
eventString.startsWith(APPL_END_EVENT_PREFIX)
eventString.startsWith(APPL_END_EVENT_PREFIX) ||
eventString.startsWith(LOG_START_EVENT_PREFIX)
}

val logPath = fileStatus.getPath()
Expand All @@ -469,7 +472,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
lastUpdated,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted,
fileStatus.getLen()
fileStatus.getLen(),
appListener.appSparkVersion.getOrElse("")
)
fileToAppInfo(logPath) = attemptInfo
logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
Expand Down Expand Up @@ -735,6 +739,8 @@ private[history] object FsHistoryProvider {
private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""

private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""

private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\""
}

/**
Expand Down Expand Up @@ -762,9 +768,10 @@ private class FsApplicationAttemptInfo(
lastUpdated: Long,
sparkUser: String,
completed: Boolean,
val fileSize: Long)
val fileSize: Long,
appSparkVersion: String)
extends ApplicationAttemptInfo(
attemptId, startTime, endTime, lastUpdated, sparkUser, completed) {
attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) {

/** extend the superclass string value with the extra attributes of this class */
override def toString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
var adminAcls: Option[String] = None
var viewAclsGroups: Option[String] = None
var adminAclsGroups: Option[String] = None
var appSparkVersion: Option[String] = None

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
Expand All @@ -57,4 +58,10 @@ private[spark] class ApplicationEventListener extends SparkListener {
adminAclsGroups = allProperties.get("spark.admin.acls.groups")
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(sparkVersion) =>
appSparkVersion = Some(sparkVersion)
case _ =>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private[spark] class EventLoggingListener(
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
val bstream = new BufferedOutputStream(cstream, outputBufferSize)

EventLoggingListener.initEventLog(bstream)
EventLoggingListener.initEventLog(bstream, testing, loggedEvents)
fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
writer = Some(new PrintWriter(bstream))
logInfo("Logging events to %s".format(logPath))
Expand Down Expand Up @@ -283,10 +283,17 @@ private[spark] object EventLoggingListener extends Logging {
*
* @param logStream Raw output stream to the event log file.
*/
def initEventLog(logStream: OutputStream): Unit = {
def initEventLog(
logStream: OutputStream,
testing: Boolean,
loggedEvents: ArrayBuffer[JValue]): Unit = {
val metadata = SparkListenerLogStart(SPARK_VERSION)
val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n"
val eventJson = JsonProtocol.logStartToJson(metadata)
val metadataJson = compact(eventJson) + "\n"
logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8))
if (testing && loggedEvents != null) {
loggedEvents += eventJson
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

/**
* An internal class that describes the metadata of an event log.
* This event is not meant to be posted to listeners downstream.
*/
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

/**
* Interface for creating history listeners defined in other modules like SQL, which are used to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ private[spark] trait SparkListenerBus
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
case _ => listener.onOtherEvent(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ private[spark] object ApplicationsListResource {
},
lastUpdated = new Date(internalAttemptInfo.lastUpdated),
sparkUser = internalAttemptInfo.sparkUser,
completed = internalAttemptInfo.completed
completed = internalAttemptInfo.completed,
appSparkVersion = internalAttemptInfo.appSparkVersion
)
}
)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class ApplicationAttemptInfo private[spark](
val lastUpdated: Date,
val duration: Long,
val sparkUser: String,
val completed: Boolean = false) {
val completed: Boolean = false,
val appSparkVersion: String) {
def getStartTimeEpoch: Long = startTime.getTime
def getEndTimeEpoch: Long = endTime.getTime
def getLastUpdatedEpoch: Long = lastUpdated.getTime
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ private[spark] class SparkUI private (

var appId: String = _

var appSparkVersion = org.apache.spark.SPARK_VERSION

private var streamingJobProgressListener: Option[SparkListener] = None

/** Initialize all components of the server. */
Expand Down Expand Up @@ -118,7 +120,8 @@ private[spark] class SparkUI private (
duration = 0,
lastUpdated = new Date(startTime),
sparkUser = getSparkUser,
completed = false
completed = false,
appSparkVersion = appSparkVersion
))
))
}
Expand All @@ -139,6 +142,7 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)

def appName: String = parent.appName

def appSparkVersion: String = parent.appSparkVersion
}

private[spark] object SparkUI {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private[spark] object UIUtils extends Logging {
<div class="brand">
<a href={prependBaseUri("/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
<span class="version">{org.apache.spark.SPARK_VERSION}</span>
<span class="version">{activeTab.appSparkVersion}</span>
</a>
</div>
<p class="navbar-text pull-right">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
"appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479335620587,
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0
Expand All @@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
"appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479252138874,
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0
Expand All @@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917391398,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
Expand All @@ -51,6 +54,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380950,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
Expand All @@ -62,6 +66,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380890,
"startTimeEpoch" : 1430917380880,
"lastUpdatedEpoch" : 0
Expand All @@ -77,6 +82,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1426633945177,
"startTimeEpoch" : 1426633910242,
"lastUpdatedEpoch" : 0
Expand All @@ -88,6 +94,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1426533945177,
"startTimeEpoch" : 1426533910242,
"lastUpdatedEpoch" : 0
Expand All @@ -102,6 +109,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1425081766912,
"startTimeEpoch" : 1425081758277,
"lastUpdatedEpoch" : 0
Expand All @@ -116,6 +124,7 @@
"duration" : 9011,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1422981788731,
"startTimeEpoch" : 1422981779720,
"lastUpdatedEpoch" : 0
Expand All @@ -130,6 +139,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1422981766912,
"startTimeEpoch" : 1422981758277,
"lastUpdatedEpoch" : 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
"appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479335620587,
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0
Expand All @@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
"appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479252138874,
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0
Expand All @@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917391398,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
Expand All @@ -51,6 +54,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380950,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
Expand All @@ -62,6 +66,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380890,
"startTimeEpoch" : 1430917380880,
"lastUpdatedEpoch" : 0
Expand All @@ -77,6 +82,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1426633945177,
"startTimeEpoch" : 1426633910242,
"lastUpdatedEpoch" : 0
Expand All @@ -88,6 +94,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1426533945177,
"startTimeEpoch" : 1426533910242,
"lastUpdatedEpoch" : 0
Expand All @@ -102,6 +109,8 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"appSparkVersion" : "",
"endTimeEpoch" : 1425081766912,
"startTimeEpoch" : 1425081758277,
"lastUpdatedEpoch" : 0
Expand All @@ -116,6 +125,7 @@
"duration" : 9011,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1422981788731,
"startTimeEpoch" : 1422981779720,
"lastUpdatedEpoch" : 0
Expand All @@ -130,6 +140,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1422981766912,
"startTimeEpoch" : 1422981758277,
"lastUpdatedEpoch" : 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
"appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479335620587,
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0
Expand All @@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
"appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479252138874,
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0
Expand All @@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917391398,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
"appSparkVersion" : "",
"endTimeEpoch" : 1422981766912,
"startTimeEpoch" : 1422981758277,
"lastUpdatedEpoch" : 0
Expand Down
Loading

0 comments on commit 181261a

Please sign in to comment.