-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix unprotected access to task_queue_.begin()->first in Scheduler #1084
Conversation
sorry @Trisfald I don't see the issue with the previous code. Why is that reference problematic? lock will be reacquire regardless of the reason, so any access to that timeout will be safe, and the reference points to the timeout itself and not to the task queue, so it shouldn't be an issue since we don't change timeout values. I might be wrong so please show me a possible theoretic path where it could happen. |
Hello! While I ran licode with Helgrind (a tool from the valgrind family), the analyzer kept complaining about an issue about concurrent access of the timeout. So I gave a first look, made this simple change and the tool was happy. |
I found maybe a wrong execution path. Two threads are running serviceQueue. Sorry if the example is a bit contrived |
there's no chance that two threads would be waiting in wait_until because Scheduler uses just one thread using io_service. |
I see, with 1 thread it's all right. Yet printing n_threads_servicing_queue in Scheduler.cpp gives me 2. Maybe kNumThreadsPerScheduler in ThreadPool.h should be set to 1? |
aww, you're right! I don't know why we have set it to 2. Given the current code we should use 1 thread instead, otherwise we should some different things there, probably not only the reference to begin()->first. Do you agree on iterating this PR to make this function thread-safe if more changes are needed? |
Yes, sure! With this change it should be already thread safe. I'll look into it a bit more with a threading analyzer to see if there are other issues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would need to update current_timeout_
inside the while loop, and probably we can make it only exist inside the serviceQueue()
function, and not as a member of the class, but I can be wrong.
while (!stop_requested_ && !task_queue_.empty() && | ||
new_task_scheduled_.wait_until(lock, task_queue_.begin()->first) != std::cv_status::timeout) { | ||
new_task_scheduled_.wait_until(lock, current_timeout_) != std::cv_status::timeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're not updating current_timeout_
within the loop, could it be an issue? what if someone adds an earlier task in the scheduler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that might be an issue. I don't know how we can correctly update the timeout for all threads when a new task is added. Maybe re-setting in the loop and do a notify_all
on insert?
while (!stop_requested_ && !task_queue_.empty() && | ||
new_task_scheduled_.wait_until(lock, task_queue_.begin()->first) != std::cv_status::timeout) { | ||
new_task_scheduled_.wait_until(lock, current_timeout_) != std::cv_status::timeout) { | ||
boost::thread::yield(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why do we need to call yield()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to give up our cpu time slice, since we are going to wait again
@@ -43,6 +45,10 @@ void Scheduler::serviceQueue() { | |||
Function f = task_queue_.begin()->second; | |||
task_queue_.erase(task_queue_.begin()); | |||
|
|||
if (!task_queue_.empty()) { | |||
current_timeout_ = task_queue_.begin()->first; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we wouldn't need to set current_timeout_
here, am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm starting to think we should still set it to the current worker but also signal the change to all other workers so they can update their timeout. But perhaps I'm overthinking it?
I see |
@Trisfald I run an experiment to see what happens inside Scheduler in the example you mentioned before (two threads accessing the same begin()->first reference) and I don't see any issue in there. I have a branch in my local repo: master...jcague:test_scheduler_unprotected_access I added some logs to see exactly what happens in those cases and found no issues. It seems like there's not an access to an invalid reference in any case. can you check whether it's the same execution path you thought about? |
@jcague I ran your experiment a bunch of times (with 1 task added to the queue for simplicity) and got this output:
This is the problematic execution path I was talking about. Now all works correctly (almost?) every time. In fact , I saw a segfault in Scheduler only once since I started working with Licode. (some other one might had gone under the radar). I think the problem is between step 4 and 5. My std library implementation of wait_until has this piece of code:
However It's possible I'm totally mistaken and that crash happened for another reason all together. |
This issue was finally fixed by #1345 |
Description
This PR fixes a concurrency issue in the Scheduler component. Passing a reference to task_queue_.begin()->first to wait_until is problematic because the timeout's reads aren't synchronized with writes happening in Scheduler::schedule.
[] It needs and includes Unit Tests
Changes in Client or Server public APIs
[] It includes documentation for these changes in
/doc
.