Skip to content

Commit

Permalink
implement dequeue count for sessions and activities
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt committed Dec 22, 2021
1 parent 3989fb0 commit 92151ad
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public virtual void OnFirstInitialization()
/// Is automatically called on all singleton objects after recovery. Typically used to
/// restart pending activities, timers, tasks and the like.
/// </summary>
public virtual void OnRecoveryCompleted()
public virtual void OnRecoveryCompleted(RecoveryCompleted evt)
{
}

Expand Down Expand Up @@ -103,5 +103,6 @@ public virtual void Process(ActivityCompleted evt, EffectTracker tracker) { }
public virtual void Process(OffloadDecision evt, EffectTracker tracker) { }
public virtual void Process(SendConfirmed evt, EffectTracker tracker) { }
public virtual void Process(TimerFired evt, EffectTracker tracker) { }
public virtual void Process(RecoveryCompleted evt, EffectTracker tracker) { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using System.Text;
using DurableTask.Core;

[DataContract]
class RecoveryCompleted : PartitionUpdateEvent
{
[DataMember]
public long RecoveredPosition { get; set; }

[DataMember]
public DateTime Timestamp { get; set; }

[DataMember]
public string WorkerId { get; set; }

[DataMember]
public int NumActivities { get; set; }

[DataMember]
public int MaxActivityDequeueCount { get; set; }

[DataMember]
public int NumSessions { get; set; }

[DataMember]
public int MaxSessionDequeueCount { get; set; }

[IgnoreDataMember]
public bool RequiresStateUpdate => (this.NumSessions + this.NumActivities) > 0; // orchestrations and activities must increment the dequeue count

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

protected override void ExtraTraceInformation(StringBuilder s)
{
s.Append(" RecoveredPosition=");
s.Append(this.RecoveredPosition);
s.Append(" NumActivities=");
s.Append(this.NumActivities);
s.Append(" MaxActivityDequeueCount=");
s.Append(this.MaxActivityDequeueCount);
s.Append(" NumSessions=");
s.Append(this.NumSessions);
s.Append(" MaxSessionDequeueCount=");
s.Append(this.MaxSessionDequeueCount);
}

[IgnoreDataMember]
public override EventId EventId => EventId.MakePartitionInternalEventId($"Recovered-{this.WorkerId}-{this.Timestamp:o}");

public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Activities);
effects.Add(TrackedObjectKey.Sessions);
}
}
}
23 changes: 21 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/ActivityWorkItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ namespace DurableTask.Netherite
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DurableTask.Core;

class ActivityWorkItem : TaskActivityWorkItem
class ActivityWorkItem : TaskActivityWorkItem, TransportAbstraction.IDurabilityListener
{
public Partition Partition { get; set; }

Expand All @@ -23,7 +24,10 @@ class ActivityWorkItem : TaskActivityWorkItem

public double StartedAt { get; set; }

public ActivityWorkItem(Partition partition, long activityId, TaskMessage message, string originWorkItem)
// enforces that the activity cannot start executing before the issuing event is persisted
public readonly TaskCompletionSource<object> WaitForDequeueCountPersistence;

ActivityWorkItem(Partition partition, long activityId, TaskMessage message, string originWorkItem)
{
this.Partition = partition;
this.OriginPartition = partition.PartitionFunction(message.OrchestrationInstance.InstanceId);
Expand All @@ -34,8 +38,23 @@ public ActivityWorkItem(Partition partition, long activityId, TaskMessage messag
this.TaskMessage = message;
}

public ActivityWorkItem(Partition partition, long activityId, TaskMessage message, string originWorkItem, PartitionUpdateEvent filingEvent)
: this(partition, activityId, message, originWorkItem)
{
if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem)
{
this.WaitForDequeueCountPersistence = new TaskCompletionSource<object>();
DurabilityListeners.Register(filingEvent, this);
}
}

public string WorkItemId => ActivitiesState.GetWorkItemId(this.Partition.PartitionId, this.ActivityId);

public string ExecutionType => (this.Partition.PartitionId == this.OriginPartition ? "Local" : "Remote");

public void ConfirmDurable(Event evt)
{
this.WaitForDequeueCountPersistence.TrySetResult(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ async Task<TaskOrchestrationWorkItem> IOrchestrationService.LockNextTaskOrchestr

if (nextOrchestrationWorkItem != null)
{
if (nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence != null)
{
await nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence.Task;
}

nextOrchestrationWorkItem.MessageBatch.WaitingSince = null;

this.workItemTraceHelper.TraceWorkItemStarted(
Expand Down Expand Up @@ -816,6 +821,11 @@ async Task<TaskActivityWorkItem> IOrchestrationService.LockNextTaskActivityWorkI

if (nextActivityWorkItem != null)
{
if (nextActivityWorkItem.WaitForDequeueCountPersistence != null)
{
await nextActivityWorkItem.WaitForDequeueCountPersistence.Task;
}

this.workItemTraceHelper.TraceWorkItemStarted(
nextActivityWorkItem.Partition.PartitionId,
WorkItemTraceHelper.WorkItemType.Activity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public bool PersistStepsFirst { get; set; } = false;

/// <summary>
/// If true, the start of work items is delayed until the dequeue count
/// is persisted. Defaults to false, which improves latency but means the
/// reported dequeue count may be lower than the actual dequeue count in some cases.
/// </summary>
public bool PersistDequeueCountBeforeStartingWorkItem { get; set; } = false;

/// <summary>
/// Pack TaskMessages generated by a single work item for the same destination into a single event.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace DurableTask.Netherite
using DurableTask.Core;
using DurableTask.Core.History;

class OrchestrationMessageBatch : InternalReadEvent
class OrchestrationMessageBatch : InternalReadEvent, TransportAbstraction.IDurabilityListener
{
public string InstanceId;
public long SessionId;
Expand All @@ -24,9 +24,12 @@ class OrchestrationMessageBatch : InternalReadEvent
public string WorkItemId;
public double? WaitingSince; // measures time waiting to execute

// enforces that the activity cannot start executing before the issuing event is persisted
public readonly TaskCompletionSource<object> WaitForDequeueCountPersistence;

public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId);

public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition)
public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent)
{
this.InstanceId = instanceId;
this.SessionId = session.SessionId;
Expand All @@ -41,14 +44,25 @@ public OrchestrationMessageBatch(string instanceId, SessionsState.Session sessio
this.WorkItemId = SessionsState.GetWorkItemId(partition.PartitionId, this.SessionId, this.BatchStartPosition);
this.WaitingSince = partition.CurrentTimeMs;

session.CurrentBatch = this;
session.CurrentBatch = this;

if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem)
{
this.WaitForDequeueCountPersistence = new TaskCompletionSource<object>();
DurabilityListeners.Register(filingEvent, this);
}

partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId}");

// continue when we have the history state loaded, which gives us the latest state and/or cursor
partition.SubmitEvent(this);
}

public void ConfirmDurable(Event evt)
{
this.WaitForDequeueCountPersistence.TrySetResult(null);
}

public IEnumerable<(TaskMessage, string)> TracedMessages
{
get
Expand Down
39 changes: 30 additions & 9 deletions src/DurableTask.Netherite/PartitionState/ActivitiesState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,32 @@ public override void OnFirstInitialization()

const double SMOOTHING_FACTOR = 0.1;

public override void OnRecoveryCompleted()
public override void OnRecoveryCompleted(RecoveryCompleted evt)
{
// reschedule work items
foreach (var pending in this.Pending)
if (this.LocalBacklog.Count > 0)
{
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, pending.Key, pending.Value.Message, pending.Value.OriginWorkItemId));
this.ScheduleNextOffloadDecision(TimeSpan.Zero);
}

if (this.LocalBacklog.Count > 0)
if (this.Pending.Count > 0)
{
this.ScheduleNextOffloadDecision(TimeSpan.Zero);
evt.NumActivities = this.Pending.Count;
evt.MaxActivityDequeueCount = this.Pending.Values.Select(val => val.DequeueCount).Max() + 1;
}
}

public override void Process(RecoveryCompleted evt, EffectTracker effects)
{
effects.Partition.Assert(this.Pending.Count == evt.NumActivities);

foreach (var kvp in this.Pending)
{
kvp.Value.DequeueCount++;

if (!effects.IsReplaying)
{
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, kvp.Key, kvp.Value.Message, kvp.Value.OriginWorkItemId, evt));
}
}
}

Expand Down Expand Up @@ -173,11 +188,12 @@ public override void Process(BatchProcessed evt, EffectTracker effects)

if (this.Pending.Count == 0 || this.EstimatedWorkItemQueueSize < this.WorkItemQueueLimit)
{
activityInfo.DequeueCount++;
this.Pending.Add(activityInfo.ActivityId, activityInfo);

if (!effects.IsReplaying)
{
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg, evt.WorkItemId));
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg, evt.WorkItemId, evt));
}

this.EstimatedWorkItemQueueSize++;
Expand Down Expand Up @@ -213,11 +229,12 @@ public override void Process(ActivityTransferReceived evt, EffectTracker effects

if (this.Pending.Count == 0 || this.EstimatedWorkItemQueueSize <= this.WorkItemQueueLimit)
{
activityInfo.DequeueCount++;
this.Pending.Add(activityInfo.ActivityId, activityInfo);

if (!effects.IsReplaying)
{
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg.Item1, msg.Item2));
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, msg.Item1, msg.Item2, evt));
}

this.EstimatedWorkItemQueueSize++;
Expand Down Expand Up @@ -268,11 +285,12 @@ public override void Process(ActivityCompleted evt, EffectTracker effects)
{
if (this.TryGetNextActivity(out var activityInfo))
{
activityInfo.DequeueCount++;
this.Pending.Add(activityInfo.ActivityId, activityInfo);

if (!effects.IsReplaying)
{
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, activityInfo.Message, activityInfo.OriginWorkItemId));
this.Partition.EnqueueActivityWorkItem(new ActivityWorkItem(this.Partition, activityInfo.ActivityId, activityInfo.Message, activityInfo.OriginWorkItemId, evt));
}

this.EstimatedWorkItemQueueSize++;
Expand Down Expand Up @@ -385,6 +403,9 @@ public class ActivityInfo

[DataMember]
public DateTime IssueTime;

[DataMember]
public int DequeueCount;
}
}
}
2 changes: 1 addition & 1 deletion src/DurableTask.Netherite/PartitionState/OutboxState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class OutboxState : TrackedObject, TransportAbstraction.IDurabilityListener

public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Outbox);

public override void OnRecoveryCompleted()
public override void OnRecoveryCompleted(RecoveryCompleted evt)
{
// resend all pending
foreach (var kvp in this.Outbox)
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.Netherite/PartitionState/PrefetchState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PrefetchState : TrackedObject
[IgnoreDataMember]
public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Prefetch);

public override void OnRecoveryCompleted()
public override void OnRecoveryCompleted(RecoveryCompleted evt)
{
// reissue prefetch tasks for what did not complete prior to crash/recovery
foreach (var kvp in this.PendingPrefetches)
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.Netherite/PartitionState/QueriesState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class QueriesState : TrackedObject
[IgnoreDataMember]
public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Queries);

public override void OnRecoveryCompleted()
public override void OnRecoveryCompleted(RecoveryCompleted evt)
{
// reissue queries that did not complete prior to crash/recovery
foreach (var kvp in this.PendingQueries)
Expand Down
Loading

0 comments on commit 92151ad

Please sign in to comment.