Skip to content

Commit

Permalink
AL-5705 when taskId is not empty, add taskId subStr(0,8) in local mod…
Browse files Browse the repository at this point in the history
…e's eventlog name (apache#449)

* AL-5705 update pom version

* AL-5705 when taskId is not empty, add taskId subStr(0,8) in local mode's eventlog name

* AL-5705 add UT

Co-authored-by: longfei.jiang <longfei.jiang@kyligence.io>
  • Loading branch information
jlfsdtc and longfei.jiang authored May 6, 2022
1 parent 1db8bca commit 32e72ce
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,20 @@ class SingleEventLogFileWriter(
hadoopConf: Configuration)
extends EventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) {

override val logPath: String = SingleEventLogFileWriter.getLogPath(logBaseDir, appId,
appAttemptId, compressionCodecName)
override val logPath: String = addTaskIdInLogPathIfExist()

protected def inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS

def addTaskIdInLogPathIfExist(): String = {
val taskId = sparkConf.get("spark.driver.param.taskId", "")
val localMaster = sparkConf.get("spark.master", "local").startsWith("local")
if (taskId.nonEmpty && localMaster) {
return SingleEventLogFileWriter.getLogPath(logBaseDir, appId,
appAttemptId, compressionCodecName) + "-" + taskId.substring(0, 8)
}
SingleEventLogFileWriter.getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
}

override def start(): Unit = {
requireLogBaseDirAsDirectory()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,31 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite {
fileSystem.listStatus(logDirPath).filter(isEventLogFile)
.sortBy { fs => getEventLogFileIndex(fs.getPath.getName) }
}

test("add TaskId In LogPath If Exist") {
val conf = new SparkConf
conf.set(EVENT_LOG_ENABLED, true)
conf.set(EVENT_LOG_DIR, testDir.toString)

val writer = EventLogFileWriter(
getUniqueApplicationId, None, testDirPath.toUri, conf,
SparkHadoopUtil.get.newConfiguration(conf)).asInstanceOf[SingleEventLogFileWriter]

var appId = writer.addTaskIdInLogPathIfExist()
assert(!appId.endsWith("-"))

conf.set("spark.driver.param.taskId", "e7eaaf98-7ee6-ae6f-b53a-d9165f336305-01")
appId = writer.addTaskIdInLogPathIfExist()
assert(appId.endsWith("e7eaaf98"))


conf.set("spark.driver.param.taskId", "")
appId = writer.addTaskIdInLogPathIfExist()
assert(!appId.endsWith("-"))

conf.set("spark.driver.param.taskId", "e7eaaf98-7ee6-ae6f-b53a-d9165f336305-01")
conf.set("spark.master", "yarn")
appId = writer.addTaskIdInLogPathIfExist()
assert(!appId.endsWith("e7eaaf98"))
}
}

0 comments on commit 32e72ce

Please sign in to comment.