Skip to content

Commit

Permalink
fix culling of waiters in instance state (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt authored Dec 22, 2021
1 parent 3989fb0 commit 266c31d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DurableTask.Netherite
{
using DurableTask.Core;
using System;
using System.Runtime.Serialization;
using System.Text;

Expand All @@ -22,6 +23,9 @@ class WaitRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;

[DataMember]
public DateTime Timestamp { get; set; }

[IgnoreDataMember]
public WaitResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState

Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
RequestId = Interlocked.Increment(ref this.SequenceNumber),
InstanceId = instanceId,
ExecutionId = executionId,
Timestamp = DateTime.UtcNow,
TimeoutUtc = this.GetTimeoutBucket(timeout),
};

Expand Down
30 changes: 22 additions & 8 deletions src/DurableTask.Netherite/PartitionState/InstanceState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,36 @@ public override void Process(CreationRequestReceived creationRequestReceived, Ef
};
}

void CullWaiters(DateTime threshold)
{
// remove all waiters whose timeout is before the threshold
if (this.Waiters.Any(request => request.TimeoutUtc <= threshold))
{
this.Waiters = this.Waiters
.Where(request => request.TimeoutUtc > threshold)
.ToList();
}
}

public override void Process(BatchProcessed evt, EffectTracker effects)
{
// update the state of an orchestration
this.OrchestrationState = evt.State;

// if the orchestration is complete, notify clients that are waiting for it
if (this.Waiters != null && WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState))
if (this.Waiters != null)
{
// we do not need effects.Add(TrackedObjectKey.Outbox) because it has already been added by SessionsState
evt.ResponsesToSend = this.Waiters.Select(request => request.CreateResponse(this.OrchestrationState)).ToList();
if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState))
{
// we do not need effects.Add(TrackedObjectKey.Outbox) because it has already been added by SessionsState
evt.ResponsesToSend = this.Waiters.Select(request => request.CreateResponse(this.OrchestrationState)).ToList();

this.Waiters = null;
this.Waiters = null;
}
else
{
this.CullWaiters(evt.Timestamp);
}
}
}

Expand All @@ -111,10 +128,7 @@ public override void Process(WaitRequestReceived evt, EffectTracker effects)
}
else
{
// cull the list of waiters to remove requests that have already timed out
this.Waiters = this.Waiters
.Where(request => request.TimeoutUtc > DateTime.UtcNow)
.ToList();
this.CullWaiters(evt.Timestamp);
}

this.Waiters.Add(evt);
Expand Down

0 comments on commit 266c31d

Please sign in to comment.