Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11419][STREAMING] Parallel recovery for FileBasedWriteAheadLog + minor recovery tweaks #9373

Closed
wants to merge 21 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
if (eventLoop == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")

// First, stop receiving
receiverTracker.stop(processAllReceivedData)
if (receiverTracker != null) {
// First, stop receiving
receiverTracker.stop(processAllReceivedData)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPE thrown when streaming context is stopped before recovery is complete

}

// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.spark.streaming.util

import java.nio.ByteBuffer
import java.util.concurrent.ThreadPoolExecutor
import java.util.{Iterator => JIterator}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ThreadPoolTaskSupport
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps

Expand Down Expand Up @@ -57,8 +59,8 @@ private[streaming] class FileBasedWriteAheadLog(
private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")

private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
implicit private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20)
private val executionContext = ExecutionContext.fromExecutorService(threadpool)
override protected val logName = s"WriteAheadLogManager $callerNameTag"

private var currentLogPath: Option[String] = None
Expand Down Expand Up @@ -124,13 +126,19 @@ private[streaming] class FileBasedWriteAheadLog(
*/
def readAll(): JIterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))

logFilesToRead.iterator.map { file =>
logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
def readFile(file: String): Iterator[ByteBuffer] = {
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
}.flatten.asJava
}
if (!closeFileAfterWrite) {
logFilesToRead.iterator.map(readFile).flatten.asJava
} else {
// For performance gains, it makes sense to parallelize the recovery if
// closeFileAfterWrite = true
seqToParIterator(threadpool, logFilesToRead, readFile).asJava
}
}

/**
Expand All @@ -146,30 +154,33 @@ private[streaming] class FileBasedWriteAheadLog(
* asynchronously.
*/
def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
val oldLogFiles = synchronized {
val expiredLogs = pastLogs.filter { _.endTime < threshTime }
pastLogs --= expiredLogs
expiredLogs
}
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

def deleteFiles() {
oldLogFiles.foreach { logInfo =>
try {
val path = new Path(logInfo.path)
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing write ahead log file $logInfo", ex)
}
def deleteFile(walInfo: LogInfo): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why rename this to walInfo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logInfo is Spark's logging method

try {
val path = new Path(walInfo.path)
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
logDebug(s"Cleared log file $walInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing write ahead log file $walInfo", ex)
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
if (!executionContext.isShutdown) {
val f = Future { deleteFiles() }
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
oldLogFiles.foreach { logInfo =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: When waitForCompletion is true, this whole deletion is done one by one.
Might be better to compose all the Future.sequence into a single Future to wait. Then they will work in parallel.
Not a problem really as waitForCompletion is true only in tests. Do this only if there is any other critical feedback.

if (!executionContext.isShutdown) {
val f = Future { deleteFile(logInfo) }(executionContext)
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
}
}
}
}
Expand Down Expand Up @@ -251,4 +262,23 @@ private[streaming] object FileBasedWriteAheadLog {
}
}.sortBy { _.startTime }
}

/**
* This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
* at any given time, where `n` is the size of the thread pool. This is crucial for use cases
* where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
* open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
*/
def seqToParIterator[I, O](
tpool: ThreadPoolExecutor,
source: Seq[I],
handler: I => Iterator[O]): Iterator[O] = {
val taskSupport = new ThreadPoolTaskSupport(tpool)
val groupSize = tpool.getMaximumPoolSize.max(8)
source.grouped(groupSize).flatMap { group =>
val parallelCollection = group.par
parallelCollection.tasksupport = taskSupport
parallelCollection.map(handler)
}.flatten
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf:
extends Closeable {

private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = false
private var closed = (instream == null) // the file may be deleted as we're opening the stream

def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
assertOpen()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util

import java.io.{Closeable, EOFException}
import java.io.{IOException, Closeable, EOFException}
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
Expand All @@ -32,7 +32,7 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
extends Iterator[ByteBuffer] with Closeable with Logging {

private val instream = HdfsUtils.getInputStream(path, conf)
private var closed = false
private var closed = (instream == null) // the file may be deleted as we're opening the stream
private var nextItem: Option[ByteBuffer] = None

override def hasNext: Boolean = synchronized {
Expand All @@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
logDebug("Error reading next item, EOF reached", e)
close()
false
case e: IOException =>
logWarning("Error while trying to read data. If the file was deleted, " +
"this should be okay.", e)
close()
if (HdfsUtils.checkFileExists(path, conf)) {
// If file exists, this could be a legitimate error
throw e
} else {
// File was deleted. This can occur when the daemon cleanup thread takes time to
// delete the file during recovery.
false
}

case e: Exception =>
logWarning("Error while trying to read data from HDFS.", e)
close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.streaming.util

import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

Expand All @@ -42,8 +44,19 @@ private[streaming] object HdfsUtils {
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
val instream = dfs.open(dfsPath)
instream
if (dfs.isFile(dfsPath)) {
try {
dfs.open(dfsPath)
} catch {
case e: IOException =>
// If we are really unlucky, the file may be deleted as we're opening the stream.
// This can happen as clean up is performed by daemon threads that may be left over from
// previous runs.
if (!dfs.isFile(dfsPath)) null else throw e
}
} else {
null
}
}

def checkState(state: Boolean, errorMsg: => String) {
Expand Down Expand Up @@ -71,4 +84,11 @@ private[streaming] object HdfsUtils {
case _ => fs
}
}

/** Check if the file exists at the given path. */
def checkFileExists(path: String, conf: Configuration): Boolean = {
val hdpPath = new Path(path)
val fs = getFileSystemForPath(hdpPath, conf)
fs.isFile(hdpPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming

import java.io.File
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
import org.apache.spark.streaming.util._
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}

Expand Down Expand Up @@ -207,6 +208,75 @@ class ReceivedBlockTrackerSuite
tracker1.isWriteAheadLogEnabled should be (false)
}

test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)

val addBlocks = generateBlockInfos()
val batch1 = addBlocks.slice(0, 1)
val batch2 = addBlocks.slice(1, 3)
val batch3 = addBlocks.slice(3, addBlocks.length)

assert(getWriteAheadLogFiles().length === 0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you added inline comments to explain each step, so that the reader can understand whats going on.

// list of timestamps for files
val t = Seq.tabulate(5)(i => i * 1000)

writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
assert(getWriteAheadLogFiles().length === 1)

// The goal is to create several log files which should have been cleaned up.
// If we face any issue during recovery, because these old files exist, then we need to make
// deletion more robust rather than a parallelized operation where we fire and forget
val batch1Allocation = createBatchAllocation(t(1), batch1)
writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)

writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))

val batch2Allocation = createBatchAllocation(t(3), batch2)
writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation)

writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))

// We should have 5 different log files as we called `writeEventsManually` with 5 different
// timestamps
assert(getWriteAheadLogFiles().length === 5)

// Create the tracker to recover from the log files. We're going to ask the tracker to clean
// things up, and then we're going to rewrite that data, and recover using a different tracker.
// They should have identical data no matter what
val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))

def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
base.getBlocksOfBatchAndStream(t(3), streamId))
subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
base.getBlocksOfBatchAndStream(t(1), streamId))
subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
}

// ask the tracker to clean up some old files
tracker.cleanupOldBatches(t(3), waitForCompletion = true)
assert(getWriteAheadLogFiles().length === 3)

val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
compareTrackers(tracker, tracker2)

// rewrite first file
writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
assert(getWriteAheadLogFiles().length === 4)
// make sure trackers are consistent
val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
compareTrackers(tracker, tracker3)

// rewrite second file
writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
assert(getWriteAheadLogFiles().length === 5)
// make sure trackers are consistent
val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
compareTrackers(tracker, tracker4)
}

/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log clean.
Expand All @@ -228,11 +298,30 @@ class ReceivedBlockTrackerSuite
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}

/**
* Write received block tracker events to a file manually.
*/
def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = {
val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
events.foreach { event =>
val bytes = Utils.serialize(event)
writer.writeInt(bytes.size)
writer.write(bytes)
}
writer.close()
}

/** Get all the data written in the given write ahead log file. */
def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
getWrittenLogData(Seq(logFile))
}

/** Get the log file name for the given log start time. */
def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
}

/**
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
Expand Down
Loading