diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 3ae926d41169d..ac50cdd497a6a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -218,11 +218,20 @@ class SingleEventLogFileWriter( hadoopConf: Configuration) extends EventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { - override val logPath: String = SingleEventLogFileWriter.getLogPath(logBaseDir, appId, - appAttemptId, compressionCodecName) + override val logPath: String = addTaskIdInLogPathIfExist() protected def inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS + def addTaskIdInLogPathIfExist(): String = { + val taskId = sparkConf.get("spark.driver.param.taskId", "") + val localMaster = sparkConf.get("spark.master", "local").startsWith("local") + if (taskId.nonEmpty && localMaster) { + return SingleEventLogFileWriter.getLogPath(logBaseDir, appId, + appAttemptId, compressionCodecName) + "-" + taskId.substring(0, 8) + } + SingleEventLogFileWriter.getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + } + override def start(): Unit = { requireLogBaseDirAsDirectory() diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index b1770227d4a41..5beaacfcad29a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -372,4 +372,31 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { fileSystem.listStatus(logDirPath).filter(isEventLogFile) .sortBy { fs => getEventLogFileIndex(fs.getPath.getName) } } + + test("add TaskId In LogPath If Exist") { + val conf = new SparkConf + conf.set(EVENT_LOG_ENABLED, true) + conf.set(EVENT_LOG_DIR, testDir.toString) + + val writer = EventLogFileWriter( + getUniqueApplicationId, None, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)).asInstanceOf[SingleEventLogFileWriter] + + var appId = writer.addTaskIdInLogPathIfExist() + assert(!appId.endsWith("-")) + + conf.set("spark.driver.param.taskId", "e7eaaf98-7ee6-ae6f-b53a-d9165f336305-01") + appId = writer.addTaskIdInLogPathIfExist() + assert(appId.endsWith("e7eaaf98")) + + + conf.set("spark.driver.param.taskId", "") + appId = writer.addTaskIdInLogPathIfExist() + assert(!appId.endsWith("-")) + + conf.set("spark.driver.param.taskId", "e7eaaf98-7ee6-ae6f-b53a-d9165f336305-01") + conf.set("spark.master", "yarn") + appId = writer.addTaskIdInLogPathIfExist() + assert(!appId.endsWith("e7eaaf98")) + } }