-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20355] Add per application spark version on the history server headerpage #17658
Changes from 12 commits
1f50b27
21f7903
bad5ea2
5f6e80b
097e7cc
5533775
7350649
e7da27d
f1fa890
298248e
bafad1a
52f414c
dad87a6
93e7a20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -248,7 +248,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |
val conf = this.conf.clone() | ||
val appSecManager = new SecurityManager(conf) | ||
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, | ||
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) | ||
HistoryServer.getAttemptURI(appId, attempt.attemptId), | ||
attempt.startTime) | ||
// Do not call ui.bind() to avoid creating a new server for each application | ||
} | ||
|
||
|
@@ -257,6 +258,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |
val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry about it, just saw you moved this two lines above. |
||
|
||
if (appListener.appId.isDefined) { | ||
ui.setAppSparkVersion(appListener.appSparkVersion.getOrElse("")) | ||
ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) | ||
// make sure to set admin acls before view acls so they are properly picked up | ||
val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") | ||
|
@@ -469,7 +471,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | |
lastUpdated, | ||
appListener.sparkUser.getOrElse(NOT_STARTED), | ||
appCompleted, | ||
fileStatus.getLen() | ||
fileStatus.getLen(), | ||
appListener.appSparkVersion.getOrElse("") | ||
) | ||
fileToAppInfo(logPath) = attemptInfo | ||
logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") | ||
|
@@ -762,9 +765,10 @@ private class FsApplicationAttemptInfo( | |
lastUpdated: Long, | ||
sparkUser: String, | ||
completed: Boolean, | ||
val fileSize: Long) | ||
val fileSize: Long, | ||
appSparkVersion: String) | ||
extends ApplicationAttemptInfo( | ||
attemptId, startTime, endTime, lastUpdated, sparkUser, completed) { | ||
attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { | ||
|
||
/** extend the superclass string value with the extra attributes of this class */ | ||
override def toString: String = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ private[spark] class ApplicationEventListener extends SparkListener { | |
var adminAcls: Option[String] = None | ||
var viewAclsGroups: Option[String] = None | ||
var adminAclsGroups: Option[String] = None | ||
var appSparkVersion: Option[String] = None | ||
|
||
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { | ||
appName = Some(applicationStart.appName) | ||
|
@@ -57,4 +58,10 @@ private[spark] class ApplicationEventListener extends SparkListener { | |
adminAclsGroups = allProperties.get("spark.admin.acls.groups") | ||
} | ||
} | ||
|
||
override def onOtherEvent(event: SparkListenerEvent): Unit = event match { | ||
case SparkListenerLogStart(sparkVersion) => | ||
appSparkVersion = Some(sparkVersion) | ||
case _ => | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need a "catch all" here:
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,7 +119,7 @@ private[spark] class EventLoggingListener( | |
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) | ||
val bstream = new BufferedOutputStream(cstream, outputBufferSize) | ||
|
||
EventLoggingListener.initEventLog(bstream) | ||
EventLoggingListener.initEventLog(bstream, testing, loggedEvents) | ||
fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) | ||
writer = Some(new PrintWriter(bstream)) | ||
logInfo("Logging events to %s".format(logPath)) | ||
|
@@ -283,10 +283,15 @@ private[spark] object EventLoggingListener extends Logging { | |
* | ||
* @param logStream Raw output stream to the event log file. | ||
*/ | ||
def initEventLog(logStream: OutputStream): Unit = { | ||
def initEventLog(logStream: OutputStream, testing: Boolean, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: style See style for multi-line arguments in |
||
loggedEvents: ArrayBuffer[JValue]): Unit = { | ||
val metadata = SparkListenerLogStart(SPARK_VERSION) | ||
val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" | ||
val eventJson = JsonProtocol.logStartToJson(metadata) | ||
val metadataJson = compact(eventJson) + "\n" | ||
logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8)) | ||
if (testing && loggedEvents != null) { | ||
loggedEvents += eventJson | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of this, how about returning the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought the loggedEvents only takes json value. Also the loggedEvents are generated here as a part of spark context and probably through other sources. The ReplayListenerSuite however tests the original events with the replay events (here the replay events are written to the event log but however the loggedEvents will not have the SparkListenerLogStart event, as this is not a part of SparkContext if I understand it correctly). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The downside to returning the event is that this function can't do more in the future. For instance if it wants to add 2 events. I guess its private so it doesn't matter to much and its just for testing stuff. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My main concern is that it makes calling this method from other places awkward. (i.e. the semantics of the new arguments are not clear.) In any case, there's a single other call to this method, in a test suite, so probably ok. |
||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,9 +160,9 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent | |
|
||
/** | ||
* An internal class that describes the metadata of an event log. | ||
* This event is not meant to be posted to listeners downstream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason this note was here? What was the reasoning behind it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was only for metadata info, so when this was written it was just not meant to be consumed but now we can reuse it for this case |
||
*/ | ||
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent | ||
@DeveloperApi | ||
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent | ||
|
||
/** | ||
* Interface for creating history listeners defined in other modules like SQL, which are used to | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,7 +71,6 @@ private[spark] trait SparkListenerBus | |
listener.onNodeUnblacklisted(nodeUnblacklisted) | ||
case blockUpdated: SparkListenerBlockUpdated => | ||
listener.onBlockUpdated(blockUpdated) | ||
case logStart: SparkListenerLogStart => // ignore event log metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't looked at this closely, but will allowing this through cause any issues? Why was it ignored before? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not see it getting consumed apart from registering some metadata, so I guess it should be fine as this event already logs the version There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this change the behavior of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that if this is gonna be posted on the bus it might be better to make it public and Adding new events is not an issue, but posting an event that people can't handle is a little odd. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not too sure if it will change any behavior and precisely why we post it to other events, in case someone wants to listen to them and utilize the event like in this scenario |
||
case _ => listener.onOtherEvent(event) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,8 @@ private[spark] class SparkUI private ( | |
|
||
var appId: String = _ | ||
|
||
var appSparkVersion = org.apache.spark.SPARK_VERSION | ||
|
||
private var streamingJobProgressListener: Option[SparkListener] = None | ||
|
||
/** Initialize all components of the server. */ | ||
|
@@ -93,6 +95,10 @@ private[spark] class SparkUI private ( | |
appId = id | ||
} | ||
|
||
def setAppSparkVersion(version: String): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
appSparkVersion = version | ||
} | ||
|
||
/** Stop the server behind this web interface. Only valid after bind(). */ | ||
override def stop() { | ||
super.stop() | ||
|
@@ -118,7 +124,8 @@ private[spark] class SparkUI private ( | |
duration = 0, | ||
lastUpdated = new Date(startTime), | ||
sparkUser = getSparkUser, | ||
completed = false | ||
completed = false, | ||
appSparkVersion = "" | ||
)) | ||
)) | ||
} | ||
|
@@ -139,6 +146,7 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) | |
|
||
def appName: String = parent.appName | ||
|
||
def appSparkVersion: String = parent.appSparkVersion | ||
} | ||
|
||
private[spark] object SparkUI { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
"duration" : 10671, | ||
"sparkUser" : "jose", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1479335620587, | ||
"startTimeEpoch" : 1479335609916, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -22,6 +23,7 @@ | |
"duration" : 101795, | ||
"sparkUser" : "jose", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't look correct. If I look at the log file for this app (
So this value should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure the if the tests hit this code path https://github.com/apache/spark/pull/17658/files#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR474, so they take the default value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably I could change the default value, ok will do it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not really about the default value; these tests replay the log files, which contain the Spark version, so I would expect the data retrieved through the API to contain the version that was recorded in the event log. Another way of saying that probably there's a bug somewhere in your code that is preventing the data from the event log from being exposed correctly through the REST API. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see events being passed to ApplicationEventListener in debug logs but interestingly I doPostEvent seems to be not posting events to the listener to listen to the event which is a bit odd, not sure I need to add a change in contract for the tests to pick this up though 17/05/05 16:39:42.235 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.jobs.JobProgressListener@53e58df ---SparkListenerLogStart(2.3.0-SNAPSHOT) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I know what this is.
It only allows those two events through currently, you probably need to the log start event to that list too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok doPost is posting the events There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aah ok the UI will work because it is replaying but the rest end point would break as it is not allowing it to pass through, thanks @vanzin |
||
"endTimeEpoch" : 1479252138874, | ||
"startTimeEpoch" : 1479252037079, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -36,6 +38,7 @@ | |
"duration" : 10505, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1430917391398, | ||
"startTimeEpoch" : 1430917380893, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -51,6 +54,7 @@ | |
"duration" : 57, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1430917380950, | ||
"startTimeEpoch" : 1430917380893, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -62,6 +66,7 @@ | |
"duration" : 10, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1430917380890, | ||
"startTimeEpoch" : 1430917380880, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -77,6 +82,7 @@ | |
"duration" : 34935, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1426633945177, | ||
"startTimeEpoch" : 1426633910242, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -88,6 +94,7 @@ | |
"duration" : 34935, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1426533945177, | ||
"startTimeEpoch" : 1426533910242, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -102,6 +109,8 @@ | |
"duration" : 8635, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1425081766912, | ||
"startTimeEpoch" : 1425081758277, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -116,6 +125,7 @@ | |
"duration" : 9011, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1422981788731, | ||
"startTimeEpoch" : 1422981779720, | ||
"lastUpdatedEpoch" : 0 | ||
|
@@ -130,6 +140,7 @@ | |
"duration" : 8635, | ||
"sparkUser" : "irashid", | ||
"completed" : true, | ||
"appSparkVersion" : "", | ||
"endTimeEpoch" : 1422981766912, | ||
"startTimeEpoch" : 1422981758277, | ||
"lastUpdatedEpoch" : 0 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think adding this line wrap is necessary