-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' #3661
Conversation
Test build #24302 has started for PR 3661 at commit
|
private val lock = new ReentrantLock() | ||
private val condition = lock.newCondition() | ||
|
||
@GuardedBy("lock") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor point - these are not in the JDK but in a Findbugs library for JSR-305. It's not used in Spark, and happens to be a dependency now. Maybe not worth using it just 1 place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not worth using it just 1 place?
So which one do you prefer?
- Use comments to describe such information.
- Use
GuardedBy
from now on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, now Findbugs does not recognize GuardedBy
in Scala codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's why I brought it up. It's not actually a standard Java annotation (unless someone tells me it just turned up in 8 or something) but part of JSR-305. This is a dependency of Spark core at the moment, but none of the annotations are used. I think we should just not use them instead of using this lib in 1 place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I changed them to comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I'm a fan of the FindBugs annotations. I've had trouble getting the various analysis tools to work well with them in Scala, though. +1 to this commenting convention, though; this is very helpful.
Test build #24305 has started for PR 3661 at commit
|
Test build #24302 has finished for PR 3661 at commit
|
Test PASSed. |
Test build #24305 has finished for PR 3661 at commit
|
Test PASSed. |
def notifyStop() = synchronized { | ||
stopped = true | ||
notifyAll() | ||
def notifyError(e: Throwable) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit, but mind adding : Unit
here?
Maybe a naive question, but is there a reason why we can't just use |
I wrote the reason in the description: "Used Condition to rewrite ContextWaiter because it provides a convenient API awaitNanos for timeout." |
Test build #24521 has started for PR 3661 at commit
|
Test build #24521 has finished for PR 3661 at commit
|
Test PASSed. |
* Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or | ||
* `false` if the waiting time detectably elapsed before return from the method. | ||
*/ | ||
def waitForStopOrError(timeout: Long = -1): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered whether this should throw TimeoutException
instead of signaling failure via a boolean, but I guess this is sort of like Object.wait()
or Condition.wait()
, both of which don't throw exceptions.
Also, it looks like the usages of this in StreamingContext
support this theory:
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
*/
def awaitTermination() {
waiter.waitForStopOrError()
}
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
* @param timeout time to wait in milliseconds
*/
def awaitTermination(timeout: Long) {
waiter.waitForStopOrError(timeout)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the documentation for the SparkContext.awaitTermination(timeout)
could be improved to convey what happens when the timeout occurs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think if it's better to return Boolean
to indicate if it's timeout? Although it will break the source compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option would be to just add a new def isTerminated: Boolean
method, which would let users write code like
waiter.awaitTermination(1000)
if (!waiter.isTerminated) {
throw Exception(...)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making awaitTermination
throw a TimeoutException if timeout? It looks a better API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that will break a lot of existing programs for people. A lot of time (even in our own test cases) awaitTermination(timeout)
is used for wait for a short period of time before check checking status or something. Currently that times out return quietly. If instead it starts throwing exceptions, then it will completely break some applications. So I strongly advise against changing this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In hindsight, instead of modeling awaitTermination
against Akka ActorSystem's awaitTermination (which return Unit
) , I should have modeled it like Java ExecutorService's awaitTermination which returns a Boolean
. Now its not possible to change the API without breaking compatiblity. :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In hindsight, instead of modeling awaitTermination against Akka ActorSystem's awaitTermination (which return Unit) , I should have modeled it like Java ExecutorService's awaitTermination which returns a Boolean. Now its not possible to change the API without breaking compatiblity. :(
@tdas, Sorry that I forgot to reply you. You said you designed it just like Akka ActorSystem.awaitTermination
. But ActorSystem.awaitTermination will throw a TimeoutException in case of timeout.
/**
* Block current thread until the system has been shutdown, or the specified
* timeout has elapsed. This will block until after all on termination
* callbacks have been run.
*
* @throws TimeoutException in case of timeout
*/
def awaitTermination(timeout: Duration): Unit
This looks good to me. |
Test build #24568 has started for PR 3661 at commit
|
Test build #24568 has finished for PR 3661 at commit
|
Test PASSed. |
This almost looks good. Can you add a unit test in the |
Actually, correction. Can you put the public API |
Test build #24876 has started for PR 3661 at commit
|
I deleted the commit about |
Test build #24876 has finished for PR 3661 at commit
|
Test FAILed. |
Jenkins, retest this please. |
Test build #24881 has started for PR 3661 at commit
|
Test build #24881 has finished for PR 3661 at commit
|
Test PASSed. |
Merging this. Thanks! |
…e 'spurious wakeup' Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout. Author: zsxwing <zsxwing@gmail.com> Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits: 52247f5 [zsxwing] Add explicit unit type be42bcf [zsxwing] Update as per review suggestion e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' (cherry picked from commit 6a89782) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
…e 'spurious wakeup' Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout. Author: zsxwing <zsxwing@gmail.com> Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits: 52247f5 [zsxwing] Add explicit unit type be42bcf [zsxwing] Update as per review suggestion e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' (cherry picked from commit 6a89782) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
…e 'spurious wakeup' Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout. Author: zsxwing <zsxwing@gmail.com> Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits: 52247f5 [zsxwing] Add explicit unit type be42bcf [zsxwing] Update as per review suggestion e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' (cherry picked from commit 6a89782) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Used
Condition
to rewriteContextWaiter
because it provides a convenient APIawaitNanos
for timeout.