diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs index ef2fad7c..e62b538e 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs @@ -4,6 +4,7 @@ namespace DurableTask.Netherite { using DurableTask.Core; + using System; using System.Runtime.Serialization; using System.Text; @@ -22,6 +23,9 @@ class WaitRequestReceived : ClientRequestEventWithPrefetch [IgnoreDataMember] public override string TracedInstanceId => this.InstanceId; + [DataMember] + public DateTime Timestamp { get; set; } + [IgnoreDataMember] public WaitResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index 2bbde310..6dac3e7e 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -400,6 +400,7 @@ public async Task WaitForOrchestrationAsync( RequestId = Interlocked.Increment(ref this.SequenceNumber), InstanceId = instanceId, ExecutionId = executionId, + Timestamp = DateTime.UtcNow, TimeoutUtc = this.GetTimeoutBucket(timeout), }; diff --git a/src/DurableTask.Netherite/PartitionState/InstanceState.cs b/src/DurableTask.Netherite/PartitionState/InstanceState.cs index 04b5e427..874c5d6d 100644 --- a/src/DurableTask.Netherite/PartitionState/InstanceState.cs +++ b/src/DurableTask.Netherite/PartitionState/InstanceState.cs @@ -80,6 +80,16 @@ public override void Process(CreationRequestReceived creationRequestReceived, Ef }; } + void CullWaiters(DateTime threshold) + { + // remove all waiters whose timeout is before the threshold + if (this.Waiters.Any(request => request.TimeoutUtc <= threshold)) + { + this.Waiters = this.Waiters + .Where(request => request.TimeoutUtc > threshold) + .ToList(); + } + } public override void Process(BatchProcessed evt, EffectTracker effects) { @@ -87,12 +97,19 @@ public override void Process(BatchProcessed evt, EffectTracker effects) this.OrchestrationState = evt.State; // if the orchestration is complete, notify clients that are waiting for it - if (this.Waiters != null && WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState)) + if (this.Waiters != null) { - // we do not need effects.Add(TrackedObjectKey.Outbox) because it has already been added by SessionsState - evt.ResponsesToSend = this.Waiters.Select(request => request.CreateResponse(this.OrchestrationState)).ToList(); + if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState)) + { + // we do not need effects.Add(TrackedObjectKey.Outbox) because it has already been added by SessionsState + evt.ResponsesToSend = this.Waiters.Select(request => request.CreateResponse(this.OrchestrationState)).ToList(); - this.Waiters = null; + this.Waiters = null; + } + else + { + this.CullWaiters(evt.Timestamp); + } } } @@ -111,10 +128,7 @@ public override void Process(WaitRequestReceived evt, EffectTracker effects) } else { - // cull the list of waiters to remove requests that have already timed out - this.Waiters = this.Waiters - .Where(request => request.TimeoutUtc > DateTime.UtcNow) - .ToList(); + this.CullWaiters(evt.Timestamp); } this.Waiters.Add(evt);