From 76a365182fffd9878304f49805e40553eba07b1e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 24 Apr 2015 13:06:30 -0700 Subject: [PATCH] Fix log cleaner, add test. --- .../deploy/history/FsHistoryProvider.scala | 51 ++++++++++++------- .../history/FsHistoryProviderSuite.scala | 48 ++++++++++++++++- 2 files changed, 79 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 68b882857543b..a4d3308efae01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.{Logging, SecurityManager, SparkConf} /** @@ -40,8 +40,12 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} * This provider checks for new finished applications in the background periodically and * renders the history application UI by parsing the associated event logs. */ -private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider - with Logging { +private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) + extends ApplicationHistoryProvider with Logging { + + def this(conf: SparkConf) = { + this(conf, new SystemClock()) + } import FsHistoryProvider._ @@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() - // List of applications to be deleted by event log cleaner. - private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] + // List of application logs to be deleted by event log cleaner. + private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] // Constants used to parse Spark 1.0.0 log directories. private[history] val LOG_PREFIX = "EVENT_LOG_" @@ -289,42 +293,51 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis /** * Delete event logs from the log directory according to the clean policy defined by the user. */ - private def cleanLogs(): Unit = { + private[history] def cleanLogs(): Unit = { try { val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000 - val now = System.currentTimeMillis() + val now = clock.getTimeMillis() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { + now - attempt.lastUpdated > maxAge && attempt.completed + } + // Scan all logs from the log directory. // Only completed applications older than the specified max age will be deleted. - applications.values.foreach { info => - if (now - info.attempts.head.lastUpdated <= maxAge || !info.attempts.head.completed) { - appsToRetain += (info.id -> info) - } else { - appsToClean += info + applications.values.foreach { app => + val toClean = app.attempts.filter(shouldClean) + attemptsToClean ++= toClean + + if (toClean.isEmpty) { + appsToRetain += (app.id -> app) + } else if (toClean.size < app.attempts.size) { + appsToRetain += (app.id -> + new FsApplicationHistoryInfo(app.id, app.name, + app.attempts.filter(!shouldClean(_)).toList)) } } applications = appsToRetain - val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] - appsToClean.foreach { info => + val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + attemptsToClean.foreach { attempt => try { - val path = new Path(logDir, info.logPath) + val path = new Path(logDir, attempt.logPath) if (fs.exists(path)) { fs.delete(path, true) } } catch { case e: AccessControlException => - logInfo(s"No permission to delete ${info.logPath}, ignoring.") + logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") case t: IOException => - logError(s"IOException in cleaning logs of ${info.logPath}", t) - leftToClean += info + logError(s"IOException in cleaning ${attempt.logPath}", t) + leftToClean += attempt } } - appsToClean = leftToClean + attemptsToClean = leftToClean } catch { case t: Exception => logError("Exception in cleaning logs", t) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index bfee97e1d3a21..32a0e891f7026 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.history import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter} import java.net.URI +import java.util.concurrent.TimeUnit import scala.io.Source @@ -30,7 +31,7 @@ import org.scalatest.Matchers import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io._ import org.apache.spark.scheduler._ -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.{JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { @@ -283,6 +284,50 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers } } + test("log cleaner") { + val maxAge = TimeUnit.SECONDS.toMillis(10) + val clock = new ManualClock(maxAge / 2) + val provider = new FsHistoryProvider( + createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) + + val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) + writeFile(log1, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), + SparkListenerApplicationEnd(2L) + ) + log1.setLastModified(0L) + + val log2 = newLogFile("app1", Some("attempt2"), inProgress = false) + writeFile(log2, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")), + SparkListenerApplicationEnd(4L) + ) + log2.setLastModified(clock.getTimeMillis()) + + updateAndCheck(provider) { list => + list.size should be (1) + list.head.attempts.size should be (2) + } + + // Move the clock forward so log1 exceeds the max age. + clock.advance(maxAge) + + updateAndCheck(provider) { list => + list.size should be (1) + list.head.attempts.size should be (1) + list.head.attempts.head.attemptId should be ("attempt2") + } + assert(!log1.exists()) + + // Do the same for the other log. + clock.advance(maxAge) + + updateAndCheck(provider) { list => + list.size should be (0) + } + assert(!log2.exists()) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: @@ -294,6 +339,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers private def updateAndCheck(provider: FsHistoryProvider) (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = { provider.checkForLogs() + provider.cleanLogs() checkFn(provider.getListing().toSeq) }