Skip to content

Commit

Permalink
Feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Aug 16, 2017
1 parent 519dab0 commit dc642bd
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val storePath = conf.get(LOCAL_STORE_DIR)

private val listing = storePath.map { path =>
private val listing: KVStore = storePath.map { path =>
val dbPath = new File(path, "listing.ldb")

def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer())
Expand Down Expand Up @@ -334,10 +334,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

override def stop(): Unit = {
listing.close()
if (initThread != null && initThread.isAlive()) {
initThread.interrupt()
initThread.join()
try {
if (initThread != null && initThread.isAlive()) {
initThread.interrupt()
initThread.join()
}
} finally {
listing.close()
}
}

Expand All @@ -362,7 +365,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
recordedFileSize(entry.getPath()) < entry.getLen()
}
.flatMap { entry => Some(entry) }
.sortWith { case (entry1, entry2) =>
entry1.getModificationTime() > entry2.getModificationTime()
}
Expand Down Expand Up @@ -788,7 +790,7 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends

def applicationInfo: Option[ApplicationInfoWrapper] = {
if (app.id != null) {
Some(app.toView(List(attempt.toView())))
Some(app.toView())
} else {
None
}
Expand All @@ -802,10 +804,10 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
var coresPerExecutor: Option[Int] = None
var memoryPerExecutorMB: Option[Int] = None

def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper = {
def toView(): ApplicationInfoWrapper = {
val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor,
memoryPerExecutorMB, Nil)
new ApplicationInfoWrapper(apiInfo, attempts)
new ApplicationInfoWrapper(apiInfo, List(attempt.toView()))
}

}
Expand Down

0 comments on commit dc642bd

Please sign in to comment.