-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18406][CORE] Race between end-of-task and completion iterator read lock release #18076
Conversation
@@ -501,14 +501,18 @@ private[spark] class BlockManager( | |||
case Some(info) => | |||
val level = info.level | |||
logDebug(s"Level for block $blockId is $level") | |||
val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId()) | |||
.getOrElse(BlockInfo.NON_TASK_WRITER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leave out the .getOrElse
here and just pass in the Option
itself into releaseLock
. This helps to avoid exposure of BlockInfo.NON_TASK_WRITER
here. Not a huge deal but just a minor nit.
if (level.useMemory && memoryStore.contains(blockId)) { | ||
val iter: Iterator[Any] = if (level.deserialized) { | ||
memoryStore.getValues(blockId).get | ||
} else { | ||
serializerManager.dataDeserializeStream( | ||
blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) | ||
} | ||
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a one-line comment before this line which references SPARK-18406, something like
"We need to capture the current taskId in case the iterator completion is triggered from a different thread which does not have TaskContext set; see SPARK-18406 for discussion"
or similar.
@@ -713,8 +718,15 @@ private[spark] class BlockManager( | |||
/** | |||
* Release a lock on the given block. | |||
*/ | |||
def releaseLock(blockId: BlockId): Unit = { | |||
blockInfoManager.unlock(blockId) | |||
def releaseLock(blockId: BlockId): Unit = releaseLock(blockId, taskAttemptId = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to overload here? Why not just have a single releaseLock
method with a default argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact BlockManager
extends BlockDataManager
, so it have to override the releaseLock(blockId: BlockId)
method, thus we keep this and implement a new method that accepts the new argument taskAttemptId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's only one implementation of BlockDataManager
these days, though? Since that's an internal interface maybe we could change it there, too?
rdd.cache() | ||
|
||
rdd.mapPartitions { iter => | ||
ThreadUtils.runInNewThread("TestThread") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice use of this helper method. I wasn't aware of this, but it's pretty nice. I'll use it in my own tests going forward.
This is a good, clean fix. I left a couple of review comments but they're only minor stylistic comments, not correctness issues. I checked and it looks like this fixes both occurrences of releasing locks in completion iterators. The only other case that I can think of testing is making sure that |
Test build #77266 has finished for PR 18076 at commit
|
LGTM |
Test build #77274 has finished for PR 18076 at commit
|
LGTM as well. |
Test build #77278 has finished for PR 18076 at commit
|
…read lock release ## What changes were proposed in this pull request? When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. ## How was this patch tested? Add new failing regression test case in `RDDSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes #18076 from jiangxb1987/completion-iterator. (cherry picked from commit d76633e) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/2.2! @jiangxb1987 can you send a new PR to backport this to branch 2.1 and 2.0? thanks! |
…read lock release When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. Add new failing regression test case in `RDDSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes apache#18076 from jiangxb1987/completion-iterator.
…tion iterator read lock release This is a backport PR of #18076 to 2.0 and 2.1. ## What changes were proposed in this pull request? When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. ## How was this patch tested? Add new failing regression test case in `RDDSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes #18096 from jiangxb1987/completion-iterator-2.0.
…read lock release When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. Add new failing regression test case in `RDDSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes apache#18076 from jiangxb1987/completion-iterator.
…tion iterator read lock release This is a backport PR of #18076 to 2.1. ## What changes were proposed in this pull request? When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. ## How was this patch tested? Add new failing regression test case in `RDDSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes #18099 from jiangxb1987/completion-iterator-2.1.
…tion iterator read lock release This is a backport PR of apache#18076 to 2.1. When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. Add new failing regression test case in `RDDSuite`. Author: Xingbo Jiang <xingbo.jiang@databricks.com> Closes apache#18099 from jiangxb1987/completion-iterator-2.1.
What changes were proposed in this pull request?
When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the
unlock
method.How was this patch tested?
Add new failing regression test case in
RDDSuite
.