-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-11419][STREAMING] Parallel recovery for FileBasedWriteAheadLog + minor recovery tweaks #9373
Conversation
Test build #44665 has finished for PR 9373 at commit
|
receiverTracker.stop(processAllReceivedData) | ||
if (receiverTracker != null) { | ||
// First, stop receiving | ||
receiverTracker.stop(processAllReceivedData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NPE thrown when streaming context is stopped before recovery is complete
Test build #44666 has finished for PR 9373 at commit
|
@@ -17,6 +17,7 @@ | |||
package org.apache.spark.streaming.util | |||
|
|||
import java.nio.ByteBuffer | |||
import java.util.concurrent.ConcurrentSkipListSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NoteToSelf: Remove
@harishreedharan Here are some benchmark results:
|
Did you try HDFS? I am assuming we'd get similar speed ups there too but in What I am wondering is if we'd actually ever have to deal with that many If this adds only a small cost or if it becomes faster, then let's keep On Sunday, November 1, 2015, Burak Yavuz <notifications@github.com
Thanks, |
Await.ready(f, 1 second) | ||
oldLogFiles.foreach { logInfo => | ||
if (!executionContext.isShutdown) { | ||
val f = Future { deleteFile(logInfo) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again should not use the default execution context. please make a execution context for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the execution context was defined implicitly in the class definition. Made it non-implicit for better readability
@harishreedharan I've been trying to test this patch, but I just couldn't set up HDFS to work with Spark using the spark-ec2 scripts. Could you please help me set up a cluster with HDFS so that I can benchmark this?
That looks like a Protobuf version incompatibility. I launched the ec2 instances using:
I used to get the following when using
|
@brkyvz I think there has been issues with Hadoop 2 related stuff in the master branch. Lets talk offline on how to fix it. |
@harishreedharan I couldn't test this on HDFS properly. Instead I enabled the parallelization only when |
@brkyvz Could you update this PR with master? The batching PR got merged, creating conflicts. |
@brkyvz Sounds good, sir. I think the issue you saw seems to be a protobuf incompatibility issue - did you compile and run against the same hadoop-2 version (2.2+ ?) |
Test build #45457 has finished for PR 9373 at commit
|
Test build #45467 has finished for PR 9373 at commit
|
Test build #45456 has finished for PR 9373 at commit
|
test this please |
Test build #45650 has finished for PR 9373 at commit
|
Test build #45648 has finished for PR 9373 at commit
|
def getMax(): Int = synchronized { max } | ||
} | ||
try { | ||
val testSeq = 1 to 64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this 1000 instead of 8 * 8. Just to make sure that we are splitting things right.
Test build #45668 has finished for PR 9373 at commit
|
@@ -582,6 +620,9 @@ object WriteAheadLogSuite { | |||
allowBatching: Boolean): Seq[String] = { | |||
val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching) | |||
val data = wal.readAll().asScala.map(byteBufferToString).toSeq | |||
// The thread pool for parallel recovery gets killed with wal.close(). Therefore we need to | |||
// eagerly compute data, otherwise the lazy computation will fail. | |||
data.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you just change toSeq
to toArray
? toArray
will drain the Iterator at once.
case ex: Exception => | ||
logWarning(s"Error clearing write ahead log file $logInfo", ex) | ||
} | ||
def deleteFile(walInfo: LogInfo): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why rename this to walInfo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logInfo
is Spark's logging method
Test build #45700 has finished for PR 9373 at commit
|
Test build #45712 has finished for PR 9373 at commit
|
Test build #45747 has finished for PR 9373 at commit
|
Test build #45781 has finished for PR 9373 at commit
|
LGTM. Merging this to master and 1.6. Thanks @brkyvz, @zsxwing and @harishreedharan |
… + minor recovery tweaks The support for closing WriteAheadLog files after writes was just merged in. Closing every file after a write is a very expensive operation as it creates many small files on S3. It's not necessary to enable it on HDFS anyway. However, when you have many small files on S3, recovery takes very long. In addition, files start stacking up pretty quickly, and deletes may not be able to keep up, therefore deletes can also be parallelized. This PR adds support for the two parallelization steps mentioned above, in addition to a couple more failures I encountered during recovery. Author: Burak Yavuz <brkyvz@gmail.com> Closes #9373 from brkyvz/par-recovery. (cherry picked from commit 7786f9c) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
… + minor recovery tweaks The support for closing WriteAheadLog files after writes was just merged in. Closing every file after a write is a very expensive operation as it creates many small files on S3. It's not necessary to enable it on HDFS anyway. However, when you have many small files on S3, recovery takes very long. In addition, files start stacking up pretty quickly, and deletes may not be able to keep up, therefore deletes can also be parallelized. This PR adds support for the two parallelization steps mentioned above, in addition to a couple more failures I encountered during recovery. Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#9373 from brkyvz/par-recovery.
The support for closing WriteAheadLog files after writes was just merged in. Closing every file after a write is a very expensive operation as it creates many small files on S3. It's not necessary to enable it on HDFS anyway.
However, when you have many small files on S3, recovery takes very long. In addition, files start stacking up pretty quickly, and deletes may not be able to keep up, therefore deletes can also be parallelized.
This PR adds support for the two parallelization steps mentioned above, in addition to a couple more failures I encountered during recovery.