Skip to content

Commit

Permalink
Cannot delete staging dir when the clusters of spark.yarn.stagingDir …
Browse files Browse the repository at this point in the history
…and spark.hadoop.fs.defaultFS are different
  • Loading branch information
sharkdtu committed Jun 19, 2017
1 parent 9a145fd commit b74138e
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ private[spark] class ApplicationMaster(

logInfo("ApplicationAttemptId: " + appAttemptId)

val fs = FileSystem.get(yarnConf)

// This shutdown hook should run *after* the SparkContext is shut down.
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
Expand All @@ -232,7 +230,7 @@ private[spark] class ApplicationMaster(
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir(fs)
cleanupStagingDir()
}
}
}
Expand Down Expand Up @@ -533,7 +531,7 @@ private[spark] class ApplicationMaster(
/**
* Clean up the staging directory.
*/
private def cleanupStagingDir(fs: FileSystem) {
private def cleanupStagingDir(): Unit = {
var stagingDirPath: Path = null
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
Expand All @@ -544,6 +542,7 @@ private[spark] class ApplicationMaster(
return
}
logInfo("Deleting staging directory " + stagingDirPath)
val fs = stagingDirPath.getFileSystem(yarnConf)
fs.delete(stagingDirPath, true)
}
} catch {
Expand Down

0 comments on commit b74138e

Please sign in to comment.