diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 61570339b1a99..948d16a6e7020 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -80,6 +80,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private val serializableConfOpt = conf.map(new SerializableWritable(_)) + /** + * Minimum duration of remembering the information of selected files. Defaults to 1 minute. + * + * Files with mod times older than this "window" of remembering will be ignored. So if new + * files are visible within this window, then the file will get selected in the next batch. + */ + private val minRememberDurationMin = Minutes(ssc.sparkContext.getConf + .get("spark.streaming.minRememberDurationMin", "1") + .toLong) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -95,7 +105,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * This would allow us to filter away not-too-old files which have already been recently * selected and processed. */ - private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration) + private val numBatchesToRemember = FileInputDStream + .calculateNumBatchesToRemember(slideDuration, minRememberDurationMin) private val durationToRemember = slideDuration * numBatchesToRemember remember(durationToRemember) @@ -330,23 +341,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private[streaming] object FileInputDStream { - /** - * Minimum duration of remembering the information of selected files. Defaults to 1 minute. - * - * Files with mod times older than this "window" of remembering will be ignored. So if new - * files are visible within this window, then the file will get selected in the next batch. - */ - private val minRememberDurationMin = Minutes(new SparkConf() - .get("spark.streaming.minRememberDurationMin", "1") - .toLong) - def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") /** * Calculate the number of last batches to remember, such that all the files selected in * at least last minRememberDurationMin duration can be remembered. */ - def calculateNumBatchesToRemember(batchDuration: Duration): Int = { + def calculateNumBatchesToRemember(batchDuration: Duration, + minRememberDurationMin: Duration): Int = { math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt } }