diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs index 2f9c3b6b..061acf10 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs @@ -68,7 +68,7 @@ public virtual void OnFirstInitialization() /// Is automatically called on all singleton objects after recovery. Typically used to /// restart pending activities, timers, tasks and the like. /// - public virtual void OnRecoveryCompleted() + public virtual void OnRecoveryCompleted(RecoveryCompleted evt) { } @@ -103,5 +103,6 @@ public virtual void Process(ActivityCompleted evt, EffectTracker tracker) { } public virtual void Process(OffloadDecision evt, EffectTracker tracker) { } public virtual void Process(SendConfirmed evt, EffectTracker tracker) { } public virtual void Process(TimerFired evt, EffectTracker tracker) { } + public virtual void Process(RecoveryCompleted evt, EffectTracker tracker) { } } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/RecoveryCompleted.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/RecoveryCompleted.cs new file mode 100644 index 00000000..cfc0d010 --- /dev/null +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/RecoveryCompleted.cs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite +{ + using System; + using System.Collections.Generic; + using System.Runtime.CompilerServices; + using System.Runtime.Serialization; + using System.Text; + using DurableTask.Core; + + [DataContract] + class RecoveryCompleted : PartitionUpdateEvent + { + [DataMember] + public long RecoveredPosition { get; set; } + + [DataMember] + public DateTime Timestamp { get; set; } + + [DataMember] + public string WorkerId { get; set; } + + [DataMember] + public int NumActivities { get; set; } + + [DataMember] + public int MaxActivityDequeueCount { get; set; } + + [DataMember] + public int NumSessions { get; set; } + + [DataMember] + public int MaxSessionDequeueCount { get; set; } + + [IgnoreDataMember] + public bool RequiresStateUpdate => (this.NumSessions + this.NumActivities) > 0; // orchestrations and activities must increment the dequeue count + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + + protected override void ExtraTraceInformation(StringBuilder s) + { + s.Append(" RecoveredPosition="); + s.Append(this.RecoveredPosition); + s.Append(" NumActivities="); + s.Append(this.NumActivities); + s.Append(" MaxActivityDequeueCount="); + s.Append(this.MaxActivityDequeueCount); + s.Append(" NumSessions="); + s.Append(this.NumSessions); + s.Append(" MaxSessionDequeueCount="); + s.Append(this.MaxSessionDequeueCount); + } + + [IgnoreDataMember] + public override EventId EventId => EventId.MakePartitionInternalEventId($"Recovered-{this.WorkerId}-{this.Timestamp:o}"); + + public override void DetermineEffects(EffectTracker effects) + { + effects.Add(TrackedObjectKey.Activities); + effects.Add(TrackedObjectKey.Sessions); + } + } +} diff --git a/src/DurableTask.Netherite/OrchestrationService/ActivityWorkItem.cs b/src/DurableTask.Netherite/OrchestrationService/ActivityWorkItem.cs index 5684b541..ce0eabb2 100644 --- a/src/DurableTask.Netherite/OrchestrationService/ActivityWorkItem.cs +++ b/src/DurableTask.Netherite/OrchestrationService/ActivityWorkItem.cs @@ -7,9 +7,10 @@ namespace DurableTask.Netherite using System.Collections.Generic; using System.Diagnostics; using System.Text; + using System.Threading.Tasks; using DurableTask.Core; - class ActivityWorkItem : TaskActivityWorkItem + class ActivityWorkItem : TaskActivityWorkItem, TransportAbstraction.IDurabilityListener { public Partition Partition { get; set; } @@ -23,7 +24,10 @@ class ActivityWorkItem : TaskActivityWorkItem public double StartedAt { get; set; } - public ActivityWorkItem(Partition partition, long activityId, TaskMessage message, string originWorkItem) + // enforces that the activity cannot start executing before the issuing event is persisted + public readonly TaskCompletionSource WaitForDequeueCountPersistence; + + ActivityWorkItem(Partition partition, long activityId, TaskMessage message, string originWorkItem) { this.Partition = partition; this.OriginPartition = partition.PartitionFunction(message.OrchestrationInstance.InstanceId); @@ -34,8 +38,23 @@ public ActivityWorkItem(Partition partition, long activityId, TaskMessage messag this.TaskMessage = message; } + public ActivityWorkItem(Partition partition, long activityId, TaskMessage message, string originWorkItem, PartitionUpdateEvent filingEvent) + : this(partition, activityId, message, originWorkItem) + { + if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem) + { + this.WaitForDequeueCountPersistence = new TaskCompletionSource(); + DurabilityListeners.Register(filingEvent, this); + } + } + public string WorkItemId => ActivitiesState.GetWorkItemId(this.Partition.PartitionId, this.ActivityId); public string ExecutionType => (this.Partition.PartitionId == this.OriginPartition ? "Local" : "Remote"); + + public void ConfirmDurable(Event evt) + { + this.WaitForDequeueCountPersistence.TrySetResult(null); + } } } diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index 71f58b28..76c6da0d 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -608,6 +608,11 @@ async Task IOrchestrationService.LockNextTaskOrchestr if (nextOrchestrationWorkItem != null) { + if (nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence != null) + { + await nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence.Task; + } + nextOrchestrationWorkItem.MessageBatch.WaitingSince = null; this.workItemTraceHelper.TraceWorkItemStarted( @@ -816,6 +821,11 @@ async Task IOrchestrationService.LockNextTaskActivityWorkI if (nextActivityWorkItem != null) { + if (nextActivityWorkItem.WaitForDequeueCountPersistence != null) + { + await nextActivityWorkItem.WaitForDequeueCountPersistence.Task; + } + this.workItemTraceHelper.TraceWorkItemStarted( nextActivityWorkItem.Partition.PartitionId, WorkItemTraceHelper.WorkItemType.Activity, diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index c708e9b0..1d283a61 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -161,6 +161,13 @@ public class NetheriteOrchestrationServiceSettings /// public bool PersistStepsFirst { get; set; } = false; + /// + /// If true, the start of work items is delayed until the dequeue count + /// is persisted. Defaults to false, which improves latency but means the + /// reported dequeue count may be lower than the actual dequeue count in some cases. + /// + public bool PersistDequeueCountBeforeStartingWorkItem { get; set; } = false; + /// /// Pack TaskMessages generated by a single work item for the same destination into a single event. /// diff --git a/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs b/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs index 8e6860bb..df223136 100644 --- a/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs +++ b/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs @@ -12,7 +12,7 @@ namespace DurableTask.Netherite using DurableTask.Core; using DurableTask.Core.History; - class OrchestrationMessageBatch : InternalReadEvent + class OrchestrationMessageBatch : InternalReadEvent, TransportAbstraction.IDurabilityListener { public string InstanceId; public long SessionId; @@ -24,9 +24,12 @@ class OrchestrationMessageBatch : InternalReadEvent public string WorkItemId; public double? WaitingSince; // measures time waiting to execute + // enforces that the activity cannot start executing before the issuing event is persisted + public readonly TaskCompletionSource WaitForDequeueCountPersistence; + public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId); - public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition) + public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent) { this.InstanceId = instanceId; this.SessionId = session.SessionId; @@ -41,14 +44,25 @@ public OrchestrationMessageBatch(string instanceId, SessionsState.Session sessio this.WorkItemId = SessionsState.GetWorkItemId(partition.PartitionId, this.SessionId, this.BatchStartPosition); this.WaitingSince = partition.CurrentTimeMs; - session.CurrentBatch = this; + session.CurrentBatch = this; + if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem) + { + this.WaitForDequeueCountPersistence = new TaskCompletionSource(); + DurabilityListeners.Register(filingEvent, this); + } + partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId}"); // continue when we have the history state loaded, which gives us the latest state and/or cursor partition.SubmitEvent(this); } + public void ConfirmDurable(Event evt) + { + this.WaitForDequeueCountPersistence.TrySetResult(null); + } + public IEnumerable<(TaskMessage, string)> TracedMessages { get diff --git a/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs b/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs index 269114f9..1eb99c47 100644 --- a/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs +++ b/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs @@ -59,17 +59,32 @@ public override void OnFirstInitialization() const double SMOOTHING_FACTOR = 0.1; - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(RecoveryCompleted evt) { - // reschedule work items - foreach (var pending in this.Pending) + if (this.LocalBacklog.Count > 0) { - this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, pending.Key, pending.Value.Message, pending.Value.OriginWorkItemId)); + this.ScheduleNextOffloadDecision(TimeSpan.Zero); } - if (this.LocalBacklog.Count > 0) + if (this.Pending.Count > 0) { - this.ScheduleNextOffloadDecision(TimeSpan.Zero); + evt.NumActivities = this.Pending.Count; + evt.MaxActivityDequeueCount = this.Pending.Values.Select(val => val.DequeueCount).Max() + 1; + } + } + + public override void Process(RecoveryCompleted evt, EffectTracker effects) + { + effects.Partition.Assert(this.Pending.Count == evt.NumActivities); + + foreach (var kvp in this.Pending) + { + kvp.Value.DequeueCount++; + + if (!effects.IsReplaying) + { + this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, kvp.Key, kvp.Value.Message, kvp.Value.OriginWorkItemId, evt)); + } } } @@ -173,11 +188,12 @@ public override void Process(BatchProcessed evt, EffectTracker effects) if (this.Pending.Count == 0 || this.EstimatedWorkItemQueueSize < this.WorkItemQueueLimit) { + activityInfo.DequeueCount++; this.Pending.Add(activityInfo.ActivityId, activityInfo); if (!effects.IsReplaying) { - this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg, evt.WorkItemId)); + this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg, evt.WorkItemId, evt)); } this.EstimatedWorkItemQueueSize++; @@ -213,11 +229,12 @@ public override void Process(ActivityTransferReceived evt, EffectTracker effects if (this.Pending.Count == 0 || this.EstimatedWorkItemQueueSize <= this.WorkItemQueueLimit) { + activityInfo.DequeueCount++; this.Pending.Add(activityInfo.ActivityId, activityInfo); if (!effects.IsReplaying) { - this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg.Item1, msg.Item2)); + this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg.Item1, msg.Item2, evt)); } this.EstimatedWorkItemQueueSize++; @@ -268,11 +285,12 @@ public override void Process(ActivityCompleted evt, EffectTracker effects) { if (this.TryGetNextActivity(out var activityInfo)) { + activityInfo.DequeueCount++; this.Pending.Add(activityInfo.ActivityId, activityInfo); if (!effects.IsReplaying) { - this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, activityInfo.Message, activityInfo.OriginWorkItemId)); + this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, activityInfo.Message, activityInfo.OriginWorkItemId, evt)); } this.EstimatedWorkItemQueueSize++; @@ -385,6 +403,9 @@ public class ActivityInfo [DataMember] public DateTime IssueTime; + + [DataMember] + public int DequeueCount; } } } diff --git a/src/DurableTask.Netherite/PartitionState/OutboxState.cs b/src/DurableTask.Netherite/PartitionState/OutboxState.cs index 2d459654..bcdc7101 100644 --- a/src/DurableTask.Netherite/PartitionState/OutboxState.cs +++ b/src/DurableTask.Netherite/PartitionState/OutboxState.cs @@ -25,7 +25,7 @@ class OutboxState : TrackedObject, TransportAbstraction.IDurabilityListener public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Outbox); - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(RecoveryCompleted evt) { // resend all pending foreach (var kvp in this.Outbox) diff --git a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs index 22441d8b..8a951eb8 100644 --- a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs +++ b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs @@ -22,7 +22,7 @@ class PrefetchState : TrackedObject [IgnoreDataMember] public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Prefetch); - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(RecoveryCompleted evt) { // reissue prefetch tasks for what did not complete prior to crash/recovery foreach (var kvp in this.PendingPrefetches) diff --git a/src/DurableTask.Netherite/PartitionState/QueriesState.cs b/src/DurableTask.Netherite/PartitionState/QueriesState.cs index f595cb62..800691d8 100644 --- a/src/DurableTask.Netherite/PartitionState/QueriesState.cs +++ b/src/DurableTask.Netherite/PartitionState/QueriesState.cs @@ -22,7 +22,7 @@ class QueriesState : TrackedObject [IgnoreDataMember] public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Queries); - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(RecoveryCompleted evt) { // reissue queries that did not complete prior to crash/recovery foreach (var kvp in this.PendingQueries) diff --git a/src/DurableTask.Netherite/PartitionState/SessionsState.cs b/src/DurableTask.Netherite/PartitionState/SessionsState.cs index 6aa192df..b2aa6113 100644 --- a/src/DurableTask.Netherite/PartitionState/SessionsState.cs +++ b/src/DurableTask.Netherite/PartitionState/SessionsState.cs @@ -33,6 +33,9 @@ internal class Session [DataMember] public List<(TaskMessage message, string originWorkItemId)> Batch { get; set; } + [DataMember] + public int DequeueCount { get; set; } + [DataMember] public bool ForceNewExecution { get; set; } @@ -48,20 +51,32 @@ internal class Session public static string GetWorkItemId(uint partition, long session, long position) => $"{partition:D2}S{session}P{position}"; - - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(RecoveryCompleted evt) { - // start work items for all sessions - foreach (var kvp in this.Sessions) - { - new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition); - } - // handle all steps that were awaiting persistence foreach(var kvp in this.StepsAwaitingPersistence) { this.ConfirmDurable(kvp.Value); } + + if (this.Sessions.Count > 0) + { + evt.NumSessions = this.Sessions.Count; + evt.MaxSessionDequeueCount = this.Sessions.Values.Select(val => val.DequeueCount).Max() + 1; + } + } + + public override void Process(RecoveryCompleted evt, EffectTracker effects) + { + effects.Partition.Assert(evt.NumSessions == this.Sessions.Count); + + // restart work items for all sessions + foreach (var kvp in this.Sessions) + { + kvp.Value.DequeueCount++; + + new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition, evt); + } } public override void UpdateLoadInfo(PartitionLoadInfo info) @@ -97,7 +112,7 @@ public override string ToString() string GetSessionPosition(Session session) => $"{this.Partition.PartitionId:D2}S{session.SessionId}P{session.BatchStartPosition + session.Batch.Count}"; - void AddMessageToSession(TaskMessage message, string originWorkItemId, bool isReplaying) + void AddMessageToSession(TaskMessage message, string originWorkItemId, bool isReplaying, PartitionUpdateEvent filingEvent) { string instanceId = message.OrchestrationInstance.InstanceId; bool forceNewExecution = message.Event is ExecutionStartedEvent; @@ -122,6 +137,7 @@ void AddMessageToSession(TaskMessage message, string originWorkItemId, bool isRe SessionId = this.SequenceNumber++, Batch = new List<(TaskMessage,string)>(), BatchStartPosition = 0, + DequeueCount = 1, ForceNewExecution = forceNewExecution, }; @@ -134,12 +150,12 @@ void AddMessageToSession(TaskMessage message, string originWorkItemId, bool isRe if (!isReplaying) // during replay, we don't start work items until end of recovery { - new OrchestrationMessageBatch(instanceId, session, this.Partition); + new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent); } } } - void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable messages, bool isReplaying) + void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable messages, bool isReplaying, PartitionUpdateEvent filingEvent) { this.Partition.Assert(!string.IsNullOrEmpty(originWorkItemId)); int? forceNewExecution = FindLastExecutionStartedEvent(messages); @@ -180,6 +196,7 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl SessionId = this.SequenceNumber++, Batch = new List<(TaskMessage,string)>(), BatchStartPosition = 0, + DequeueCount = 1, ForceNewExecution = forceNewExecution.HasValue, }; @@ -195,7 +212,7 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl if (!isReplaying) // we don't start work items until end of recovery { - new OrchestrationMessageBatch(instanceId, session, this.Partition); + new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent); } } } @@ -221,39 +238,39 @@ public override void Process(TaskMessagesReceived evt, EffectTracker effects) foreach (var group in evt.TaskMessages .GroupBy(tm => tm.OrchestrationInstance.InstanceId)) { - this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying); + this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt); } } public override void Process(RemoteActivityResultReceived evt, EffectTracker effects) { // queues task message (from another partition) in a new or existing session - this.AddMessageToSession(evt.Result, evt.WorkItemId, effects.IsReplaying); + this.AddMessageToSession(evt.Result, evt.WorkItemId, effects.IsReplaying, evt); } public override void Process(ClientTaskMessagesReceived evt, EffectTracker effects) { // queues task message (from a client) in a new or existing session var instanceId = evt.TaskMessages[0].OrchestrationInstance.InstanceId; - this.AddMessagesToSession(instanceId, evt.WorkItemId, evt.TaskMessages, effects.IsReplaying); + this.AddMessagesToSession(instanceId, evt.WorkItemId, evt.TaskMessages, effects.IsReplaying, evt); } public override void Process(TimerFired timerFired, EffectTracker effects) { // queues a timer fired message in a session - this.AddMessageToSession(timerFired.TaskMessage, timerFired.OriginWorkItemId, effects.IsReplaying); + this.AddMessageToSession(timerFired.TaskMessage, timerFired.OriginWorkItemId, effects.IsReplaying, timerFired); } public override void Process(ActivityCompleted activityCompleted, EffectTracker effects) { // queues an activity-completed message in a session - this.AddMessageToSession(activityCompleted.Response, activityCompleted.WorkItemId, effects.IsReplaying); + this.AddMessageToSession(activityCompleted.Response, activityCompleted.WorkItemId, effects.IsReplaying, activityCompleted); } public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) { // queues the execution started message - this.AddMessageToSession(creationRequestReceived.TaskMessage, creationRequestReceived.WorkItemId, effects.IsReplaying); + this.AddMessageToSession(creationRequestReceived.TaskMessage, creationRequestReceived.WorkItemId, effects.IsReplaying, creationRequestReceived); } public override void Process(DeletionRequestReceived deletionRequestReceived, EffectTracker effects) @@ -334,7 +351,7 @@ public override void Process(BatchProcessed evt, EffectTracker effects) { foreach (var group in evt.LocalMessages.GroupBy(tm => tm.OrchestrationInstance.InstanceId)) { - this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying); + this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt); } } @@ -350,11 +367,12 @@ public override void Process(BatchProcessed evt, EffectTracker effects) effects.Partition.Assert(session.BatchStartPosition == evt.BatchStartPosition); session.Batch.RemoveRange(0, evt.BatchLength); session.BatchStartPosition += evt.BatchLength; + session.DequeueCount = 1; - this.StartNewBatchIfNeeded(session, effects, evt.InstanceId, effects.IsReplaying); + this.StartNewBatchIfNeeded(session, effects, evt.InstanceId, effects.IsReplaying, evt); } - void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instanceId, bool inRecovery) + void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instanceId, bool inRecovery, PartitionUpdateEvent filingEvent) { if (session.Batch.Count == 0) { @@ -366,7 +384,7 @@ void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instan if (!inRecovery) // we don't start work items until end of recovery { // there are more messages. Start another work item. - new OrchestrationMessageBatch(instanceId, session, this.Partition); + new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent); } } } diff --git a/src/DurableTask.Netherite/PartitionState/TimersState.cs b/src/DurableTask.Netherite/PartitionState/TimersState.cs index c1fb3a97..eef01385 100644 --- a/src/DurableTask.Netherite/PartitionState/TimersState.cs +++ b/src/DurableTask.Netherite/PartitionState/TimersState.cs @@ -30,7 +30,7 @@ public override string ToString() return $"Timers ({this.PendingTimers.Count} pending) next={this.SequenceNumber:D6}"; } - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(RecoveryCompleted evt) { // restore the pending timers foreach (var kvp in this.PendingTimers) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs index 139f2f6e..48b2ecab 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs @@ -443,14 +443,29 @@ public async Task RestartThingsAtEndOfRecovery() { this.traceHelper.FasterProgress("Restarting tasks"); + // we use this event for updating dequeue counts when restarting sessions and/or activities + var recoveryCompletedEvent = new RecoveryCompleted() + { + PartitionId = this.partition.PartitionId, + RecoveredPosition = this.CommitLogPosition, + Timestamp= DateTime.UtcNow, + WorkerId = this.partition.Settings.WorkerId, + }; + + // restart pending actitivities, timers, work items etc. using (EventTraceContext.MakeContext(this.CommitLogPosition, string.Empty)) { foreach (var key in TrackedObjectKey.GetSingletons()) { var target = (TrackedObject)await this.store.ReadAsync(key, this.effectTracker).ConfigureAwait(false); - target.OnRecoveryCompleted(); + target.OnRecoveryCompleted(recoveryCompletedEvent); } } + + if (recoveryCompletedEvent.RequiresStateUpdate) + { + this.partition.SubmitEvent(recoveryCompletedEvent); + } } public async ValueTask ReplayUpdate(PartitionUpdateEvent partitionUpdateEvent)