Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20355] Add per application spark version on the history server headerpage #17658

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class SparkContext(config: SparkConf) extends Logging {

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
Some(SparkUI.createLiveUI(this, _conf, listenerBus, SPARK_VERSION, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,18 +243,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
applications.get(appId).flatMap { appInfo =>
appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
val replayBus = new ReplayListenerBus()
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)

val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name,
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
SparkUI.createHistoryUI(conf, replayBus, appSecManager,
appListener.appSparkVersion.getOrElse(""), appInfo.name,
HistoryServer.getAttemptURI(appId, attempt.attemptId),
attempt.startTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think adding this line wrap is necessary

// Do not call ui.bind() to avoid creating a new server for each application
}

val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))

val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
Copy link
Contributor

@jerryshao jerryshao Apr 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to remove this two lines? As I know it is necessary because in the previous replay call we only parse SparkListenerApplicationStart and SparkListenerApplicationEnd event.

Sorry about it, just saw you moved this two lines above.


if (appListener.appId.isDefined) {
ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
// make sure to set admin acls before view acls so they are properly picked up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
var adminAcls: Option[String] = None
var viewAclsGroups: Option[String] = None
var adminAclsGroups: Option[String] = None
var appSparkVersion: Option[String] = None

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
Expand All @@ -57,4 +58,9 @@ private[spark] class ApplicationEventListener extends SparkListener {
adminAclsGroups = allProperties.get("spark.admin.acls.groups")
}
}

override def onOtherEvent(event:SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(sparkVersion) =>
appSparkVersion = Some(sparkVersion)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need a "catch all" here:

scala> class MyListener extends org.apache.spark.scheduler.SparkListener {
     |   override def onOtherEvent(event: org.apache.spark.scheduler.SparkListenerEvent): Unit = event match {
     |     case _: org.apache.spark.scheduler.SparkListenerTaskStart => ()
     |   }
     | }
defined class MyListener
scala> sc.addSparkListener(new MyListener())
scala> spark.range(1000).write.saveAsTable("test")
17/04/19 10:39:01 ERROR LiveListenerBus: Listener MyListener threw an exception
scala.MatchError: SparkListenerSQLExecutionStart(0,saveAsTable at <console>:27,org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)

}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

/**
* An internal class that describes the metadata of an event log.
* This event is not meant to be posted to listeners downstream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this note was here? What was the reasoning behind it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only for metadata info, so when this was written it was just not meant to be consumed but now we can reuse it for this case

*/
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ private[spark] trait SparkListenerBus
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at this closely, but will allowing this through cause any issues? Why was it ignored before?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see it getting consumed apart from registering some metadata, so I guess it should be fine as this event already logs the version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this change the behavior of SparkListenerBus? Previously we ignored this event, but here with this change we will post this event through listener.onOtherEvent and user will get this event. But from the code this event is not accessible outside of Spark.

private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if this is gonna be posted on the bus it might be better to make it public and @DeveloperApi like other events.

Adding new events is not an issue, but posting an event that people can't handle is a little odd.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too sure if it will change any behavior and precisely why we post it to other events, in case someone wants to listen to them and utilize the event like in this scenario

case _ => listener.onOtherEvent(event)
}
}
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private[spark] class SparkUI private (
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
val operationGraphListener: RDDOperationGraphListener,
val appSparkVersion: String,
var appName: String,
val basePath: String,
val startTime: Long)
Expand Down Expand Up @@ -139,6 +140,8 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)

def appName: String = parent.appName

def appSparkVersion: String = parent.appSparkVersion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, remove extra line if you are making changes.

}

private[spark] object SparkUI {
Expand All @@ -156,23 +159,25 @@ private[spark] object SparkUI {
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
appSparkVersion: String,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
create(Some(sc), conf, listenerBus, securityManager, appSparkVersion, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
}

def createHistoryUI(
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appSparkVersion: String,
appName: String,
basePath: String,
startTime: Long): SparkUI = {
val sparkUI = create(
None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
val sparkUI = create(None, conf, listenerBus, securityManager,
appSparkVersion, appName, basePath, startTime = startTime)

val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
Utils.getContextOrSparkClassLoader).asScala
Expand All @@ -195,6 +200,7 @@ private[spark] object SparkUI {
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appSparkVersion: String,
appName: String,
basePath: String = "",
jobProgressListener: Option[JobProgressListener] = None,
Expand All @@ -218,8 +224,8 @@ private[spark] object SparkUI {
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)

new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
new SparkUI(sc, conf, securityManager, environmentListener,
storageStatusListener, executorsListener, _jobProgressListener, storageListener,
operationGraphListener, appSparkVersion, appName, basePath, startTime)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private[spark] object UIUtils extends Logging {
<div class="brand">
<a href={prependBaseUri("/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
<span class="version">{org.apache.spark.SPARK_VERSION}</span>
<span class="version">{activeTab.appSparkVersion}</span>
</a>
</div>
<p class="navbar-text pull-right">
Expand Down