Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete FASTER sessions before releasing them during query processing #397

Merged
merged 7 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>5</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<VersionSuffix>clientQueryFix.1</VersionSuffix>
Copy link
Member

@jviau jviau May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend just using preview. It is fine to publicly release a change like this as preview.

<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<BuildSuffix Condition="'$(GITHUB_RUN_NUMBER)' != ''">.$(GITHUB_RUN_NUMBER)</BuildSuffix>
<FileVersion>$(VersionPrefix)$(BuildSuffix)</FileVersion>
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>5</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<VersionSuffix>clientQueryFix.1</VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<BuildSuffix Condition="'$(GITHUB_RUN_NUMBER)' != ''">.$(GITHUB_RUN_NUMBER)</BuildSuffix>
<FileVersion>$(VersionPrefix)$(BuildSuffix)</FileVersion>
Expand Down
67 changes: 40 additions & 27 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,20 @@ readonly ClientSession<Key, Value, EffectTracker, Output, object, IFunctions<Key
static readonly SemaphoreSlim availableQuerySessions = new SemaphoreSlim(queryParallelism);
readonly ConcurrentBag<int> idleQuerySessions = new ConcurrentBag<int>(Enumerable.Range(0, queryParallelism));

async ValueTask<FasterKV<Key, Value>.ReadAsyncResult<EffectTracker, Output, object>> ReadOnQuerySessionAsync(string instanceId, CancellationToken cancellationToken)
async ValueTask<(Status, Output)> ReadWithFasterAsync(string instanceId, CancellationToken cancellationToken)
{
await availableQuerySessions.WaitAsync();
try
{
bool success = this.idleQuerySessions.TryTake(out var session);
bool success = this.idleQuerySessions.TryTake(out int session);
this.partition.Assert(success, "available sessions must be larger than or equal to semaphore count");
try
{
// We need to call `.Complete.` before releasing the session, or else another thread may pick up the session and request a different IO operation,
// which can cause FASTER errors.
var result = await this.querySessions[session].ReadAsync(TrackedObjectKey.Instance(instanceId), token: cancellationToken).ConfigureAwait(false);
return result;
(Status status, Output output) = result.Complete();
return (status, output);
}
finally
{
Expand Down Expand Up @@ -950,45 +953,56 @@ public override ValueTask RemoveKeys(IEnumerable<TrackedObjectKey> keys)
async IAsyncEnumerable<(string,OrchestrationState)> QueryEnumeratedStates(
EffectTracker effectTracker,
PartitionQueryEvent queryEvent,
IEnumerator<string> enumerator,
IEnumerator<string> queryRequests,
int pageSize,
TimeSpan timeBudget,
DateTime attempt
)
{
var instanceQuery = queryEvent.InstanceQuery;
string queryId = queryEvent.EventIdString;

int? pageLimit = pageSize > 0 ? pageSize : null;
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration from {queryEvent.ContinuationToken} with pageLimit={(pageLimit.HasValue ? pageLimit.ToString() : "none")} timeBudget={timeBudget}");
Stopwatch stopwatch = Stopwatch.StartNew();
using var pageCapacity = new SemaphoreSlim(pageLimit.HasValue ? pageLimit.Value : 100);

var channel = Channel.CreateBounded<(bool last, ValueTask<FasterKV<Key, Value>.ReadAsyncResult<EffectTracker, Output, object>> responseTask)>(200);
using var leftToFill = new SemaphoreSlim(pageLimit.HasValue ? pageLimit.Value : 100);
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(this.partition.ErrorHandler.Token);
var cancellationToken = cancellationTokenSource.Token;

Task readIssueLoop = Task.Run(ReadIssueLoop);
async Task ReadIssueLoop()
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration from {queryEvent.ContinuationToken} with pageLimit={(pageLimit.HasValue ? pageLimit.ToString() : "none")} timeBudget={timeBudget}");
Stopwatch stopwatch = Stopwatch.StartNew();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I'm just re-ordering some variables, to help with readability (it's a long method, I'm trying to group related things together).


// We have 'producer-consumer' pattern here, the producer issues requests to FASTER, and the consumer reads the responses.
// The channel mediates the communication between producer and consumer.
var channel = Channel.CreateBounded<(bool isDone, ValueTask<(Status status, Output output)> readTask)>(200);

// Producer loop
Task produceFasterReadsTask = Task.Run(ProduceFasterReadRequests);
async Task ProduceFasterReadRequests()
{
try
{
while (enumerator.MoveNext())
// for each query, we issue a read command to FASTER, to be consumed in order by the reader
while (queryRequests.MoveNext())
{
if ((!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) && !enumerator.Current.StartsWith(instanceQuery.InstanceIdPrefix))
|| (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(enumerator.Current)))
if ((!string.IsNullOrEmpty(instanceQuery?.InstanceIdPrefix) && !queryRequests.Current.StartsWith(instanceQuery.InstanceIdPrefix))
|| (instanceQuery.ExcludeEntities && DurableTask.Core.Common.Entities.IsEntityInstance(queryRequests.Current)))
{
// the instance does not match the prefix
continue;
}

await leftToFill.WaitAsync(cancellationToken);
// Ensure there's still capacity in this query's result page, and in the channel (as it's bounded/limited)
await pageCapacity.WaitAsync(cancellationToken);
await channel.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false);
var readTask = this.ReadOnQuerySessionAsync(enumerator.Current, cancellationToken);
await channel.Writer.WriteAsync((false, readTask), cancellationToken).ConfigureAwait(false);

// Issue read command to FASTER (key'ed by the instanceId), and write them in the channel to be consumed by the 'reader'
var readTask = this.ReadWithFasterAsync(queryRequests.Current, cancellationToken);
await channel.Writer.WriteAsync((isDone: false, readTask), cancellationToken).ConfigureAwait(false);
}

await channel.Writer.WriteAsync((true, default), cancellationToken).ConfigureAwait(false); // marks end of index
channel.Writer.Complete();
// Notify reader that we're done processing requests
await channel.Writer.WriteAsync((isDone: true, default), cancellationToken).ConfigureAwait(false); // marks end of index
channel.Writer.Complete(); // notify reader to stop waiting for more data
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"query {queryId} attempt {attempt:o} enumeration finished because it reached end");
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -1023,11 +1037,12 @@ void ReportProgress(string status)

ReportProgress("start");

// Start of consumer loop
while (await channel.Reader.WaitToReadAsync(this.partition.ErrorHandler.Token).ConfigureAwait(false))
{
while (channel.Reader.TryRead(out var item))
{
if (item.last)
if (item.isDone)
{
ReportProgress("completed");
yield return (null, null);
Expand All @@ -1050,16 +1065,14 @@ void ReportProgress(string status)

try
{
var response = await item.responseTask.ConfigureAwait(false);

(Status status, Output output) = response.Complete();
(Status status, Output output) = await item.readTask.ConfigureAwait(false);

scanned++;

if (status.NotFound)
{
// because we are running concurrently, the index can be out of sync with the actual store
leftToFill.Release();
pageCapacity.Release();
continue;
}

Expand Down Expand Up @@ -1100,7 +1113,7 @@ void ReportProgress(string status)
if (orchestrationState != null && instanceQuery.Matches(orchestrationState))
{
matched++;
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"match instance {enumerator.Current}");
this.partition.EventDetailTracer?.TraceEventProcessingDetail($"match instance {queryRequests.Current}");
yield return (position, orchestrationState);

if (pageLimit.HasValue)
Expand All @@ -1113,12 +1126,12 @@ void ReportProgress(string status)
}
else
{
leftToFill.Release();
pageCapacity.Release();
}
}
else
{
leftToFill.Release();
pageCapacity.Release();
}
}
}
Expand All @@ -1129,7 +1142,7 @@ void ReportProgress(string status)

done:
cancellationTokenSource.Cancel();
await readIssueLoop;
await produceFasterReadsTask;
yield break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>5</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<VersionSuffix>clientQueryFix.1</VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
<BuildSuffix Condition="'$(GITHUB_RUN_NUMBER)' != ''">.$(GITHUB_RUN_NUMBER)</BuildSuffix>
<FileVersion>$(VersionPrefix)$(BuildSuffix)</FileVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

// This must be updated when updating the version of the package
[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.5.0", true)]
[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.5.2-clientQueryFix.1", true)]
Loading