-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Complete FASTER sessions before releasing them during query processing #397
Conversation
|
||
int? pageLimit = pageSize > 0 ? pageSize : null; | ||
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration from {queryEvent.ContinuationToken} with pageLimit={(pageLimit.HasValue ? pageLimit.ToString() : "none")} timeBudget={timeBudget}"); | ||
Stopwatch stopwatch = Stopwatch.StartNew(); | ||
using var pageCapacity = new SemaphoreSlim(pageLimit.HasValue ? pageLimit.Value : 100); | ||
|
||
var channel = Channel.CreateBounded<(bool last, ValueTask<FasterKV<Key, Value>.ReadAsyncResult<EffectTracker, Output, object>> responseTask)>(200); | ||
using var leftToFill = new SemaphoreSlim(pageLimit.HasValue ? pageLimit.Value : 100); | ||
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(this.partition.ErrorHandler.Token); | ||
var cancellationToken = cancellationTokenSource.Token; | ||
|
||
Task readIssueLoop = Task.Run(ReadIssueLoop); | ||
async Task ReadIssueLoop() | ||
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration from {queryEvent.ContinuationToken} with pageLimit={(pageLimit.HasValue ? pageLimit.ToString() : "none")} timeBudget={timeBudget}"); | ||
Stopwatch stopwatch = Stopwatch.StartNew(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here I'm just re-ordering some variables, to help with readability (it's a long method, I'm trying to group related things together).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overall looks safe to me. You have moved the Complete
call from just after channel read to immediately before channel write. There is no other code (of this repo) between those two points. But more importantly, you complete the FASTER session before the finally block runs, returning the session for another to take.
I can see how it was a race condition before
I'm going temporarily add a suffix to this PR's version to try and create a private release |
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix> | ||
<VersionSuffix></VersionSuffix> | ||
<VersionSuffix>clientQueryFix.1</VersionSuffix> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend just using preview
. It is fine to publicly release a change like this as preview.
I can't think of a good reason, perhaps performance was a concern, yes, but I don't think it really matters since from what I understand calling
looks fine to me. |
Wonderful. If the diff looks good (it ended up looked weirder than anticipated after I indented everything in a |
…rite into dajusto/prevent-indexOutOfRange-warnings
@sebastianburckhardt: can I get a review from you here? I'd like to get this in, if it looks good, on the upcoming release. We're hoping to release sometime next week |
finally | ||
{ | ||
cancellationTokenSource.Cancel(); | ||
await produceFasterReadsTask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency this await should use ConfigureAwait(false)
also, I think.
And perhaps we could add a ReportProgress
after this await? So we can detect cases where the await hangs or where the finally block does not get called (finally block in method with yield
gets called only if caller correctly disposes enumerator).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the ConfigureAwait(false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And added the ReportProgress on my last commit
done: | ||
yield break; | ||
} | ||
finally |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a specific reason you added a try/finally wrapper? I suppose it is for handling additional places where exceptions could be thrown that are not already caught?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's precisely right. I just want to make sure there's no usages of the semaphore after it's been disposed and, to do that, I need to make sure the producer task is awaited before we exit this method (as after we exit, the semaphore will be disposed)
Should fix: #383
Context
Some customers are reporting large numbers of FASTER warnings of the following form:
The stack trace here suggests this is occurring when processing DF client queries (like getting the status of an instanceId).
@TedHartMS suspects this error is occurring because two threads may be using the same FASTER session at the same time.
We originally thought this could not be the case as we have tight concurrency control over FASTER sessions in
ReadOnQuerySessionAsync
(notice the use ofConcurrentBag
to obtain sessionIDs in a thread-safe way, and the semaphore):durabletask-netherite/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Lines 50 to 71 in f8b5634
However, Ted called out that we're not calling
.Complete()
on that session's result before exiting that method, which means that the FASTER session may be obtained through theConcurrentBag
before FASTER is done cleaning up it's previous execution data, leading to the exception reported above. Today, this.Complete()
method is called later on, in the caller of theReadOnQuerySessionAsync
, here:durabletask-netherite/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Lines 1055 to 1056 in f8b5634
This PR
This PR attempts to call refactor the code in
ReadOnQuerySessionAsync
and it's caller so that.Complete
is called inReadOnQuerySessionAsync
itself, hopefully preventing the exception.It also renames some variables in relevant methods to make them, in my opinion, easier to read. I also added several comments to help future maintainers understand the logic. This is a subjective change, and I'm open to feedback.
Request for reviewiers
I worry that there may be a reason why we weren't calling
.Complete()
inReadOnQuerySessionAsync
: most likely due to performance reasons? I'd appreciate a focus on possible performance regressions.I also notice the caller of
ReadOnQuerySessionAsync
follows a "producer-consumer" pattern, where the producer issues FASTER read requests (throughReadOnQuerySessionAsync
) and the consumer calls.Complete()
on them. Now, the producer calls.Complete()
and I worry I may have broken some underlying assumptions of that design. Please let me know if that seems true to you all.