-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4705] Handle multiple app attempts event logs, history server. #5432
Closed
+546
−201
Closed
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
0eb7722
SPARK-4705: Doing cherry-pick of fix into master
twinkle-g 4c1fc26
SPARK-4705 Incorporating the review comments regarding formatting, wi…
6b2e521
SPARK-4705 Incorporating the review comments regarding formatting, wi…
318525a
SPARK-4705: 1) moved from directory structure to single file, as per …
twinkle-g 5fd5c6f
Fix my broken rebase.
3245aa2
Make app attempts part of the history server model.
88b1de8
Add a test for apps with multiple attempts.
cbe8bba
Attempt ID in listener event should be an option.
ce5ee5d
Misc UI, test, style fixes.
c3e0a82
Move app name to app info, more UI fixes.
657ec18
Fix yarn history URL, app links.
3a14503
Argh scalastyle.
9092af5
Fix HistoryServer test.
07446c6
Disable striping for app id / name when multiple attempts exist.
86de638
Merge branch 'master' into SPARK-4705
9092d39
Merge branch 'master' into SPARK-4705
c14ec19
Merge branch 'master' into SPARK-4705
f1cb9b3
Merge branch 'master' into SPARK-4705
ba34b69
Use Option[String] for attempt id.
d5a9c37
Update JsonProtocol test, make property name consistent.
9d59d92
Scalastyle...
2ad77e7
Missed a reference to the old property name.
1aa309d
Improve sorting of app attempts.
7c381ec
Merge branch 'master' into SPARK-4705
76a3651
Fix log cleaner, add test.
bc885b7
Review feedback.
f66dcc5
Merge branch 'master' into SPARK-4705
7e289fa
Review feedback.
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,16 +32,20 @@ import org.apache.spark.deploy.SparkHadoopUtil | |
import org.apache.spark.io.CompressionCodec | ||
import org.apache.spark.scheduler._ | ||
import org.apache.spark.ui.SparkUI | ||
import org.apache.spark.util.{ThreadUtils, Utils} | ||
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} | ||
import org.apache.spark.{Logging, SecurityManager, SparkConf} | ||
|
||
/** | ||
* A class that provides application history from event logs stored in the file system. | ||
* This provider checks for new finished applications in the background periodically and | ||
* renders the history application UI by parsing the associated event logs. | ||
*/ | ||
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider | ||
with Logging { | ||
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | ||
extends ApplicationHistoryProvider with Logging { | ||
|
||
def this(conf: SparkConf) = { | ||
this(conf, new SystemClock()) | ||
} | ||
|
||
import FsHistoryProvider._ | ||
|
||
|
@@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] | ||
= new mutable.LinkedHashMap() | ||
|
||
// List of applications to be deleted by event log cleaner. | ||
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] | ||
// List of application logs to be deleted by event log cleaner. | ||
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] | ||
|
||
// Constants used to parse Spark 1.0.0 log directories. | ||
private[history] val LOG_PREFIX = "EVENT_LOG_" | ||
|
@@ -138,31 +142,34 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
|
||
override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values | ||
|
||
override def getAppUI(appId: String): Option[SparkUI] = { | ||
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = { | ||
try { | ||
applications.get(appId).map { info => | ||
val replayBus = new ReplayListenerBus() | ||
val ui = { | ||
val conf = this.conf.clone() | ||
val appSecManager = new SecurityManager(conf) | ||
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, | ||
s"${HistoryServer.UI_PATH_PREFIX}/$appId") | ||
// Do not call ui.bind() to avoid creating a new server for each application | ||
} | ||
applications.get(appId).flatMap { appInfo => | ||
val attempts = appInfo.attempts.filter(_.attemptId == attemptId) | ||
attempts.headOption.map { attempt => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: this can just be |
||
val replayBus = new ReplayListenerBus() | ||
val ui = { | ||
val conf = this.conf.clone() | ||
val appSecManager = new SecurityManager(conf) | ||
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, | ||
HistoryServer.getAttemptURI(appId, attempt.attemptId)) | ||
// Do not call ui.bind() to avoid creating a new server for each application | ||
} | ||
|
||
val appListener = new ApplicationEventListener() | ||
replayBus.addListener(appListener) | ||
val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus) | ||
val appListener = new ApplicationEventListener() | ||
replayBus.addListener(appListener) | ||
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) | ||
|
||
ui.setAppName(s"${appInfo.name} ($appId)") | ||
ui.setAppName(s"${appInfo.name} ($appId)") | ||
|
||
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) | ||
ui.getSecurityManager.setAcls(uiAclsEnabled) | ||
// make sure to set admin acls before view acls so they are properly picked up | ||
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) | ||
ui.getSecurityManager.setViewAcls(appInfo.sparkUser, | ||
appListener.viewAcls.getOrElse("")) | ||
ui | ||
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) | ||
ui.getSecurityManager.setAcls(uiAclsEnabled) | ||
// make sure to set admin acls before view acls so they are properly picked up | ||
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) | ||
ui.getSecurityManager.setViewAcls(attempt.sparkUser, | ||
appListener.viewAcls.getOrElse("")) | ||
ui | ||
} | ||
} | ||
} catch { | ||
case e: FileNotFoundException => None | ||
|
@@ -220,7 +227,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
*/ | ||
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = { | ||
val bus = new ReplayListenerBus() | ||
val newApps = logs.flatMap { fileStatus => | ||
val newAttempts = logs.flatMap { fileStatus => | ||
try { | ||
val res = replay(fileStatus, bus) | ||
logInfo(s"Application log ${res.logPath} loaded successfully.") | ||
|
@@ -232,76 +239,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
e) | ||
None | ||
} | ||
}.toSeq.sortWith(compareAppInfo) | ||
|
||
// When there are new logs, merge the new list with the existing one, maintaining | ||
// the expected ordering (descending end time). Maintaining the order is important | ||
// to avoid having to sort the list every time there is a request for the log list. | ||
if (newApps.nonEmpty) { | ||
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() | ||
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { | ||
if (!mergedApps.contains(info.id) || | ||
mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) && | ||
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { | ||
mergedApps += (info.id -> info) | ||
} | ||
} | ||
} | ||
|
||
val newIterator = newApps.iterator.buffered | ||
val oldIterator = applications.values.iterator.buffered | ||
while (newIterator.hasNext && oldIterator.hasNext) { | ||
if (compareAppInfo(newIterator.head, oldIterator.head)) { | ||
addIfAbsent(newIterator.next()) | ||
} else { | ||
addIfAbsent(oldIterator.next()) | ||
if (newAttempts.isEmpty) { | ||
return | ||
} | ||
|
||
// Build a map containing all apps that contain new attempts. The app information in this map | ||
// contains both the new app attempt, and those that were already loaded in the existing apps | ||
// map. If an attempt has been updated, it replaces the old attempt in the list. | ||
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]() | ||
newAttempts.foreach { attempt => | ||
val appInfo = newAppMap.get(attempt.appId) | ||
.orElse(applications.get(attempt.appId)) | ||
.map { app => | ||
val attempts = | ||
app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt) | ||
new FsApplicationHistoryInfo(attempt.appId, attempt.name, | ||
attempts.sortWith(compareAttemptInfo)) | ||
} | ||
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt))) | ||
newAppMap(attempt.appId) = appInfo | ||
} | ||
|
||
// Merge the new app list with the existing one, maintaining the expected ordering (descending | ||
// end time). Maintaining the order is important to avoid having to sort the list every time | ||
// there is a request for the log list. | ||
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo) | ||
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() | ||
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { | ||
if (!mergedApps.contains(info.id)) { | ||
mergedApps += (info.id -> info) | ||
} | ||
newIterator.foreach(addIfAbsent) | ||
oldIterator.foreach(addIfAbsent) | ||
} | ||
|
||
applications = mergedApps | ||
val newIterator = newApps.iterator.buffered | ||
val oldIterator = applications.values.iterator.buffered | ||
while (newIterator.hasNext && oldIterator.hasNext) { | ||
if (newAppMap.contains(oldIterator.head.id)) { | ||
oldIterator.next() | ||
} else if (compareAppInfo(newIterator.head, oldIterator.head)) { | ||
addIfAbsent(newIterator.next()) | ||
} else { | ||
addIfAbsent(oldIterator.next()) | ||
} | ||
} | ||
newIterator.foreach(addIfAbsent) | ||
oldIterator.foreach(addIfAbsent) | ||
|
||
applications = mergedApps | ||
} | ||
|
||
/** | ||
* Delete event logs from the log directory according to the clean policy defined by the user. | ||
*/ | ||
private def cleanLogs(): Unit = { | ||
private[history] def cleanLogs(): Unit = { | ||
try { | ||
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000 | ||
|
||
val now = System.currentTimeMillis() | ||
val now = clock.getTimeMillis() | ||
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() | ||
|
||
def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { | ||
now - attempt.lastUpdated > maxAge && attempt.completed | ||
} | ||
|
||
// Scan all logs from the log directory. | ||
// Only completed applications older than the specified max age will be deleted. | ||
applications.values.foreach { info => | ||
if (now - info.lastUpdated <= maxAge || !info.completed) { | ||
appsToRetain += (info.id -> info) | ||
} else { | ||
appsToClean += info | ||
applications.values.foreach { app => | ||
val (toClean, toRetain) = app.attempts.partition(shouldClean) | ||
attemptsToClean ++= toClean | ||
|
||
if (toClean.isEmpty) { | ||
appsToRetain += (app.id -> app) | ||
} else if (toRetain.nonEmpty) { | ||
appsToRetain += (app.id -> | ||
new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList)) | ||
} | ||
} | ||
|
||
applications = appsToRetain | ||
|
||
val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] | ||
appsToClean.foreach { info => | ||
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] | ||
attemptsToClean.foreach { attempt => | ||
try { | ||
val path = new Path(logDir, info.logPath) | ||
val path = new Path(logDir, attempt.logPath) | ||
if (fs.exists(path)) { | ||
fs.delete(path, true) | ||
} | ||
} catch { | ||
case e: AccessControlException => | ||
logInfo(s"No permission to delete ${info.logPath}, ignoring.") | ||
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") | ||
case t: IOException => | ||
logError(s"IOException in cleaning logs of ${info.logPath}", t) | ||
leftToClean += info | ||
logError(s"IOException in cleaning ${attempt.logPath}", t) | ||
leftToClean += attempt | ||
} | ||
} | ||
|
||
appsToClean = leftToClean | ||
attemptsToClean = leftToClean | ||
} catch { | ||
case t: Exception => logError("Exception in cleaning logs", t) | ||
} | ||
|
@@ -315,14 +350,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
private def compareAppInfo( | ||
i1: FsApplicationHistoryInfo, | ||
i2: FsApplicationHistoryInfo): Boolean = { | ||
if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime | ||
val a1 = i1.attempts.head | ||
val a2 = i2.attempts.head | ||
if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime | ||
} | ||
|
||
/** | ||
* Comparison function that defines the sort order for application attempts within the same | ||
* application. Order is: running attempts before complete attempts, running attempts sorted | ||
* by start time, completed attempts sorted by end time. | ||
* | ||
* Normally applications should have a single running attempt; but failure to call sc.stop() | ||
* may cause multiple running attempts to show up. | ||
* | ||
* @return Whether `a1` should precede `a2`. | ||
*/ | ||
private def compareAttemptInfo( | ||
a1: FsApplicationAttemptInfo, | ||
a2: FsApplicationAttemptInfo): Boolean = { | ||
if (a1.completed == a2.completed) { | ||
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime | ||
} else { | ||
!a1.completed | ||
} | ||
} | ||
|
||
/** | ||
* Replays the events in the specified log file and returns information about the associated | ||
* application. | ||
*/ | ||
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = { | ||
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { | ||
val logPath = eventLog.getPath() | ||
logInfo(s"Replaying log path: $logPath") | ||
val logInput = | ||
|
@@ -336,10 +393,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis | |
val appCompleted = isApplicationCompleted(eventLog) | ||
bus.addListener(appListener) | ||
bus.replay(logInput, logPath.toString, !appCompleted) | ||
new FsApplicationHistoryInfo( | ||
new FsApplicationAttemptInfo( | ||
logPath.getName(), | ||
appListener.appId.getOrElse(logPath.getName()), | ||
appListener.appName.getOrElse(NOT_STARTED), | ||
appListener.appId.getOrElse(logPath.getName()), | ||
appListener.appAttemptId, | ||
appListener.startTime.getOrElse(-1L), | ||
appListener.endTime.getOrElse(-1L), | ||
getModificationTime(eventLog).get, | ||
|
@@ -425,13 +483,21 @@ private object FsHistoryProvider { | |
val DEFAULT_LOG_DIR = "file:/tmp/spark-events" | ||
} | ||
|
||
private class FsApplicationHistoryInfo( | ||
private class FsApplicationAttemptInfo( | ||
val logPath: String, | ||
id: String, | ||
name: String, | ||
val name: String, | ||
val appId: String, | ||
attemptId: Option[String], | ||
startTime: Long, | ||
endTime: Long, | ||
lastUpdated: Long, | ||
sparkUser: String, | ||
completed: Boolean = true) | ||
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed) | ||
extends ApplicationAttemptInfo( | ||
attemptId, startTime, endTime, lastUpdated, sparkUser, completed) | ||
|
||
private class FsApplicationHistoryInfo( | ||
id: String, | ||
override val name: String, | ||
override val attempts: List[FsApplicationAttemptInfo]) | ||
extends ApplicationHistoryInfo(id, name, attempts) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the doc for
getAppUI
says to use an empty string for apps with a single attempt -- but that isn't exactly what is reflected here. Some yarn apps will be successful on the first attempt, but with this implementation, you still need to pass in the actual attempt id. One way or the other, the doc & this should be resolved.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface doc is slightly misleading, but all event logs from YARN will have an attempt ID after this change, even for a single attempt.