Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4705] Handle multiple app attempts event logs, history server. #5432

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0eb7722
SPARK-4705: Doing cherry-pick of fix into master
twinkle-g Feb 2, 2015
4c1fc26
SPARK-4705 Incorporating the review comments regarding formatting, wi…
Feb 25, 2015
6b2e521
SPARK-4705 Incorporating the review comments regarding formatting, wi…
Feb 25, 2015
318525a
SPARK-4705: 1) moved from directory structure to single file, as per …
twinkle-g Mar 1, 2015
5fd5c6f
Fix my broken rebase.
Apr 7, 2015
3245aa2
Make app attempts part of the history server model.
Apr 8, 2015
88b1de8
Add a test for apps with multiple attempts.
Apr 8, 2015
cbe8bba
Attempt ID in listener event should be an option.
Apr 8, 2015
ce5ee5d
Misc UI, test, style fixes.
Apr 8, 2015
c3e0a82
Move app name to app info, more UI fixes.
Apr 9, 2015
657ec18
Fix yarn history URL, app links.
Apr 9, 2015
3a14503
Argh scalastyle.
Apr 9, 2015
9092af5
Fix HistoryServer test.
Apr 9, 2015
07446c6
Disable striping for app id / name when multiple attempts exist.
Apr 10, 2015
86de638
Merge branch 'master' into SPARK-4705
Apr 13, 2015
9092d39
Merge branch 'master' into SPARK-4705
Apr 17, 2015
c14ec19
Merge branch 'master' into SPARK-4705
Apr 20, 2015
f1cb9b3
Merge branch 'master' into SPARK-4705
Apr 23, 2015
ba34b69
Use Option[String] for attempt id.
Apr 23, 2015
d5a9c37
Update JsonProtocol test, make property name consistent.
Apr 23, 2015
9d59d92
Scalastyle...
Apr 23, 2015
2ad77e7
Missed a reference to the old property name.
Apr 23, 2015
1aa309d
Improve sorting of app attempts.
Apr 23, 2015
7c381ec
Merge branch 'master' into SPARK-4705
Apr 24, 2015
76a3651
Fix log cleaner, add test.
Apr 24, 2015
bc885b7
Review feedback.
Apr 28, 2015
f66dcc5
Merge branch 'master' into SPARK-4705
Apr 28, 2015
7e289fa
Review feedback.
Apr 30, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
Expand Down Expand Up @@ -312,6 +313,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

def applicationId: String = _applicationId
def applicationAttemptId: Option[String] = _applicationAttemptId

def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null

Expand Down Expand Up @@ -469,6 +471,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.start()

_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_env.blockManager.initialize(_applicationId)

Expand All @@ -481,7 +484,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down Expand Up @@ -1843,7 +1847,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
startTime, sparkUser, applicationAttemptId))
}

/** Post the application end event */
Expand Down Expand Up @@ -1891,7 +1895,7 @@ object SparkContext extends Logging {
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
private val activeContext: AtomicReference[SparkContext] =
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

/**
Expand Down Expand Up @@ -1944,11 +1948,11 @@ object SparkContext extends Logging {
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* Note: This function cannot be used to create multiple SparkContext instances
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(config: SparkConf): SparkContext = {
Expand All @@ -1961,17 +1965,17 @@ object SparkContext extends Logging {
activeContext.get()
}
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
*
* This method allows not passing a SparkConf (useful if just retrieving).
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(): SparkContext = {
getOrCreate(new SparkConf())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package org.apache.spark.deploy.history

import org.apache.spark.ui.SparkUI

private[history] case class ApplicationHistoryInfo(
id: String,
name: String,
private[history] case class ApplicationAttemptInfo(
attemptId: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)

private[history] case class ApplicationHistoryInfo(
id: String,
name: String,
attempts: List[ApplicationAttemptInfo])

private[history] abstract class ApplicationHistoryProvider {

/**
Expand All @@ -41,9 +45,11 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @param attemptId The application attempt ID for apps with multiple attempts (or an empty
* string for apps with a single attempt).
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): Option[SparkUI]
def getAppUI(appId: String, attemptId: String): Option[SparkUI]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to use an Option[String] for attemptId here (and everywhere else) as well?


/**
* Called when the server is shutting down.
Expand Down
Loading