Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Nov 10, 2015
1 parent 7f8cfe3 commit 98da092
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package org.apache.spark.streaming.util

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentSkipListSet
import java.util.{Iterator => JIterator}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.{ThreadPoolTaskSupport, ForkJoinTaskSupport}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps

Expand Down Expand Up @@ -58,8 +58,8 @@ private[streaming] class FileBasedWriteAheadLog(
private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")

private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
implicit private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName)
private val executionContext = ExecutionContext.fromExecutorService(threadpool)
override protected val logName = s"WriteAheadLogManager $callerNameTag"

private var currentLogPath: Option[String] = None
Expand Down Expand Up @@ -125,13 +125,21 @@ private[streaming] class FileBasedWriteAheadLog(
*/
def readAll(): JIterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))

logFilesToRead.par.map { file =>
logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
def readFile(file: String): Iterator[ByteBuffer] = {
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
}.flatten.toIterator.asJava
}
if (!closeFileAfterWrite) {
logFilesToRead.iterator.map(readFile).flatten.asJava
} else {
// For performance gains, it makes sense to parallelize the recovery if
// closeFileAfterWrite = true
val parallelFilesToRead = logFilesToRead.par
parallelFilesToRead.tasksupport = new ThreadPoolTaskSupport(threadpool)
parallelFilesToRead.map(readFile).flatten.toIterator.asJava
}
}

/**
Expand All @@ -147,10 +155,13 @@ private[streaming] class FileBasedWriteAheadLog(
* asynchronously.
*/
def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
val oldLogFiles = synchronized {
val expiredLogs = pastLogs.filter { _.endTime < threshTime }
pastLogs --= expiredLogs
expiredLogs
}
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
synchronized { pastLogs --= oldLogFiles }
def deleteFile(walInfo: LogInfo): Unit = {
try {
val path = new Path(walInfo.path)
Expand All @@ -165,7 +176,7 @@ private[streaming] class FileBasedWriteAheadLog(
}
oldLogFiles.foreach { logInfo =>
if (!executionContext.isShutdown) {
val f = Future { deleteFile(logInfo) }
val f = Future { deleteFile(logInfo) }(executionContext)
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
"this should be okay.", e)
close()
if (HdfsUtils.checkFileExists(path, conf)) {
// if file exists, this could be a legitimate error
// If file exists, this could be a legitimate error
throw e
} else {
// file was deleted. This can occur when the daemon cleanup thread takes time to
// File was deleted. This can occur when the daemon cleanup thread takes time to
// delete the file during recovery.
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming

import java.io.File
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.{WriteAheadLogSuite, WriteAheadLogUtils, FileBasedWriteAheadLogReader}
import org.apache.spark.streaming.util._
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}

Expand Down Expand Up @@ -208,83 +209,71 @@ class ReceivedBlockTrackerSuite
}

test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
val manualClock = new ManualClock
conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
val tracker = createTracker(clock = manualClock)

val addBlocks = generateBlockInfos()
val batch1 = addBlocks.slice(0, 1)
val batch2 = addBlocks.slice(1, 3)
val batch3 = addBlocks.slice(3, 6)

def advanceTime(): Unit = manualClock.advance(1000)
val batch3 = addBlocks.slice(3, addBlocks.length)

assert(getWriteAheadLogFiles().length === 0)

val start = manualClock.getTimeMillis()
manualClock.advance(500)
tracker.cleanupOldBatches(start, waitForCompletion = false)
assert(getWriteAheadLogFiles().length === 1)
advanceTime()
batch1.foreach(tracker.addBlock)
// list of timestamps for files
val t = Seq.tabulate(5)(i => i * 1000)

writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
assert(getWriteAheadLogFiles().length === 1)
advanceTime()

val batch1Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch1Time)
advanceTime()
// The goal is to create several log files which should have been cleaned up.
// If we face any issue during recovery, because these old files exist, then we need to make
// deletion more robust rather than a parallelized operation where we fire and forget
val batch1Allocation = createBatchAllocation(t(1), batch1)
writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)

batch2.foreach { block =>
tracker.addBlock(block)
advanceTime()
}
assert(getWriteAheadLogFiles().length === 3)
writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))

advanceTime()
val batch2Allocation = createBatchAllocation(t(3), batch2)
writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation)

val batch2Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch2Time)
writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))

advanceTime()
// We should have 5 different log files as we called `writeEventsManually` with 5 different
// timestamps
assert(getWriteAheadLogFiles().length === 5)

assert(getWriteAheadLogFiles().length === 4)
tracker.cleanupOldBatches(batch1Time, waitForCompletion = true)
assert(getWriteAheadLogFiles().length === 3)
// Create the tracker to recover from the log files. We're going to ask the tracker to clean
// things up, and then we're going to rewrite that data, and recover using a different tracker.
// They should have identical data no matter what
val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))

batch3.foreach { block =>
tracker.addBlock(block)
advanceTime()
def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
base.getBlocksOfBatchAndStream(t(3), streamId))
subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
base.getBlocksOfBatchAndStream(t(1), streamId))
subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
}
val batch3Time = manualClock.getTimeMillis()
tracker.allocateBlocksToBatch(batch3Time)

advanceTime()
assert(getWriteAheadLogFiles().length === 4)
advanceTime()
tracker.cleanupOldBatches(batch2Time, waitForCompletion = true)
// ask the tracker to clean up some old files
tracker.cleanupOldBatches(t(3), waitForCompletion = true)
assert(getWriteAheadLogFiles().length === 3)

def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
subject.getBlocksOfBatchAndStream(batch3Time, streamId) should be(
base.getBlocksOfBatchAndStream(batch3Time, streamId))
subject.getBlocksOfBatchAndStream(batch2Time, streamId) should be(
base.getBlocksOfBatchAndStream(batch2Time, streamId))
subject.getBlocksOfBatchAndStream(batch1Time, streamId) should be(Nil)
}

val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
compareTrackers(tracker, tracker2)

WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(start), Seq(createBatchCleanup(start)))

// rewrite first file
writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
assert(getWriteAheadLogFiles().length === 4)
val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
// make sure trackers are consistent
val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
compareTrackers(tracker, tracker3)
WriteAheadLogSuite.writeEventsUsingWriter(getLogFileName(batch1Time),
Seq(createBatchAllocation(batch1Time, batch1)))

// rewrite second file
writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
assert(getWriteAheadLogFiles().length === 5)
val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = manualClock)
// make sure trackers are consistent
val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
compareTrackers(tracker, tracker4)
}

Expand All @@ -309,6 +298,19 @@ class ReceivedBlockTrackerSuite
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}

/**
* Write received block tracker events to a file manually.
*/
def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = {
val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
events.foreach { event =>
val bytes = Utils.serialize(event)
writer.writeInt(bytes.size)
writer.write(bytes)
}
writer.close()
}

/** Get all the data written in the given write ahead log file. */
def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
getWrittenLogData(Seq(logFile))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,21 +367,6 @@ object WriteAheadLogSuite {
segments
}

/**
* Write received block tracker events to a file using the writer class and return an array of
* the file segments written.
*/
def writeEventsUsingWriter(
filePath: String,
events: Seq[ReceivedBlockTrackerLogEvent]): Seq[FileBasedWriteAheadLogSegment] = {
val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
val segments = events.map {
item => writer.write(ByteBuffer.wrap(Utils.serialize(item)))
}
writer.close()
segments
}

/** Write data to rotating files in log directory using the WriteAheadLog class. */
def writeDataUsingWriteAheadLog(
logDirectory: String,
Expand Down

0 comments on commit 98da092

Please sign in to comment.