From e9be43f6bdd2d70db8d44bb38177b869da3fbb7a Mon Sep 17 00:00:00 2001 From: Sanket Date: Tue, 9 May 2017 09:30:09 -0500 Subject: [PATCH] [SPARK-20355] Add per application spark version on the history server headerpage ## What changes were proposed in this pull request? Spark Version for a specific application is not displayed on the history page now. It should be nice to switch the spark version on the UI when we click on the specific application. Currently there seems to be way as SparkListenerLogStart records the application version. So, it should be trivial to listen to this event and provision this change on the UI. For Example screen shot 2017-04-06 at 3 23 41 pm screen shot 2017-04-17 at 9 59 33 am {"Event":"SparkListenerLogStart","Spark Version":"2.0.0"} (Please fill in changes proposed in this fix) Modified the SparkUI for History server to listen to SparkLogListenerStart event and extract the version and print it. ## How was this patch tested? Manual testing of UI page. Attaching the UI screenshot changes here (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Sanket Closes #17658 from redsanket/SPARK-20355. --- .../history/ApplicationHistoryProvider.scala | 3 ++- .../deploy/history/FsHistoryProvider.scala | 17 ++++++++++++----- .../scheduler/ApplicationEventListener.scala | 7 +++++++ .../spark/scheduler/EventLoggingListener.scala | 13 ++++++++++--- .../apache/spark/scheduler/SparkListener.scala | 4 ++-- .../spark/scheduler/SparkListenerBus.scala | 1 - .../status/api/v1/ApplicationListResource.scala | 3 ++- .../org/apache/spark/status/api/v1/api.scala | 3 ++- .../scala/org/apache/spark/ui/SparkUI.scala | 6 +++++- .../scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../application_list_json_expectation.json | 10 ++++++++++ .../completed_app_list_json_expectation.json | 11 +++++++++++ .../limit_app_list_json_expectation.json | 3 +++ .../maxDate2_app_list_json_expectation.json | 1 + .../maxDate_app_list_json_expectation.json | 2 ++ .../maxEndDate_app_list_json_expectation.json | 7 +++++++ ...nd_maxEndDate_app_list_json_expectation.json | 4 ++++ .../minDate_app_list_json_expectation.json | 8 ++++++++ ...nd_maxEndDate_app_list_json_expectation.json | 4 ++++ .../minEndDate_app_list_json_expectation.json | 6 +++++- .../one_app_json_expectation.json | 1 + .../one_app_multi_attempt_json_expectation.json | 2 ++ .../deploy/history/ApplicationCacheSuite.scala | 2 +- .../deploy/history/FsHistoryProviderSuite.scala | 4 ++-- project/MimaExcludes.scala | 3 +++ 25 files changed, 107 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 6d8758a3d3b1d..5cb48ca3e60b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -30,7 +30,8 @@ private[spark] case class ApplicationAttemptInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = false) + completed: Boolean = false, + appSparkVersion: String) private[spark] case class ApplicationHistoryInfo( id: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f4235df245128..d05ca142b618b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -248,7 +248,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) + HistoryServer.getAttemptURI(appId, attempt.attemptId), + attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } @@ -257,6 +258,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) if (appListener.appId.isDefined) { + ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) // make sure to set admin acls before view acls so they are properly picked up val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") @@ -443,7 +445,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newAttempts = try { val eventsFilter: ReplayEventsFilter = { eventString => eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) + eventString.startsWith(APPL_END_EVENT_PREFIX) || + eventString.startsWith(LOG_START_EVENT_PREFIX) } val logPath = fileStatus.getPath() @@ -469,7 +472,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) lastUpdated, appListener.sparkUser.getOrElse(NOT_STARTED), appCompleted, - fileStatus.getLen() + fileStatus.getLen(), + appListener.appSparkVersion.getOrElse("") ) fileToAppInfo(logPath) = attemptInfo logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") @@ -735,6 +739,8 @@ private[history] object FsHistoryProvider { private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" + + private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" } /** @@ -762,9 +768,10 @@ private class FsApplicationAttemptInfo( lastUpdated: Long, sparkUser: String, completed: Boolean, - val fileSize: Long) + val fileSize: Long, + appSparkVersion: String) extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed) { + attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { /** extend the superclass string value with the extra attributes of this class */ override def toString: String = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index 28c45d800ed06..6da8865cd10d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -34,6 +34,7 @@ private[spark] class ApplicationEventListener extends SparkListener { var adminAcls: Option[String] = None var viewAclsGroups: Option[String] = None var adminAclsGroups: Option[String] = None + var appSparkVersion: Option[String] = None override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { appName = Some(applicationStart.appName) @@ -57,4 +58,10 @@ private[spark] class ApplicationEventListener extends SparkListener { adminAclsGroups = allProperties.get("spark.admin.acls.groups") } } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case SparkListenerLogStart(sparkVersion) => + appSparkVersion = Some(sparkVersion) + case _ => + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a7dbf87915b27..f481436332249 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -119,7 +119,7 @@ private[spark] class EventLoggingListener( val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) val bstream = new BufferedOutputStream(cstream, outputBufferSize) - EventLoggingListener.initEventLog(bstream) + EventLoggingListener.initEventLog(bstream, testing, loggedEvents) fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) writer = Some(new PrintWriter(bstream)) logInfo("Logging events to %s".format(logPath)) @@ -283,10 +283,17 @@ private[spark] object EventLoggingListener extends Logging { * * @param logStream Raw output stream to the event log file. */ - def initEventLog(logStream: OutputStream): Unit = { + def initEventLog( + logStream: OutputStream, + testing: Boolean, + loggedEvents: ArrayBuffer[JValue]): Unit = { val metadata = SparkListenerLogStart(SPARK_VERSION) - val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" + val eventJson = JsonProtocol.logStartToJson(metadata) + val metadataJson = compact(eventJson) + "\n" logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8)) + if (testing && loggedEvents != null) { + loggedEvents += eventJson + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index bc2e530716686..59f89a82a1da8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -160,9 +160,9 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent /** * An internal class that describes the metadata of an event log. - * This event is not meant to be posted to listeners downstream. */ -private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent /** * Interface for creating history listeners defined in other modules like SQL, which are used to diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 3ff363321e8c9..3b0d3b1b150fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -71,7 +71,6 @@ private[spark] trait SparkListenerBus listener.onNodeUnblacklisted(nodeUnblacklisted) case blockUpdated: SparkListenerBlockUpdated => listener.onBlockUpdated(blockUpdated) - case logStart: SparkListenerLogStart => // ignore event log metadata case _ => listener.onOtherEvent(event) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index a0239266d8756..f039744e7f67f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -90,7 +90,8 @@ private[spark] object ApplicationsListResource { }, lastUpdated = new Date(internalAttemptInfo.lastUpdated), sparkUser = internalAttemptInfo.sparkUser, - completed = internalAttemptInfo.completed + completed = internalAttemptInfo.completed, + appSparkVersion = internalAttemptInfo.appSparkVersion ) } ) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 56d8e51732ffd..f6203271f3cd2 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -38,7 +38,8 @@ class ApplicationAttemptInfo private[spark]( val lastUpdated: Date, val duration: Long, val sparkUser: String, - val completed: Boolean = false) { + val completed: Boolean = false, + val appSparkVersion: String) { def getStartTimeEpoch: Long = startTime.getTime def getEndTimeEpoch: Long = endTime.getTime def getLastUpdatedEpoch: Long = lastUpdated.getTime diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index bf4cf79e9faa3..f271c56021e95 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -60,6 +60,8 @@ private[spark] class SparkUI private ( var appId: String = _ + var appSparkVersion = org.apache.spark.SPARK_VERSION + private var streamingJobProgressListener: Option[SparkListener] = None /** Initialize all components of the server. */ @@ -118,7 +120,8 @@ private[spark] class SparkUI private ( duration = 0, lastUpdated = new Date(startTime), sparkUser = getSparkUser, - completed = false + completed = false, + appSparkVersion = appSparkVersion )) )) } @@ -139,6 +142,7 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) def appName: String = parent.appName + def appSparkVersion: String = parent.appSparkVersion } private[spark] object SparkUI { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 79b0d81af52b5..8e1aafa448bc4 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -228,7 +228,7 @@ private[spark] object UIUtils extends Logging {