Skip to content

Commit

Permalink
fix replay bug in activity scheduler, and add configuration setting (#51
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sebastianburckhardt authored Jun 14, 2021
1 parent 1c0cc94 commit 7af8787
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;

/// <summary>
/// Settings for how activities .
/// </summary>
public enum ActivitySchedulerOptions
{
/// <summary>
/// All activities are scheduled on the same partition as the orchestration.
/// </summary>
Local,

/// <summary>
/// Activities are scheduled locally if possible, but backlog is offloaded periodically.
/// </summary>
PeriodicOffload,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public class NetheriteOrchestrationServiceSettings
[JsonConverter(typeof(StringEnumConverter))]
public PartitionManagementOptions PartitionManagement { get; set; } = PartitionManagementOptions.EventProcessorHost;

/// <summary>
/// Gets or sets the activity scheduler option
/// </summary>
[JsonConverter(typeof(StringEnumConverter))]
public ActivitySchedulerOptions ActivityScheduler { get; set; } = ActivitySchedulerOptions.PeriodicOffload;

/// <summary>
/// Gets or sets a flag indicating whether to enable caching of execution cursors to avoid replay.
/// </summary>
Expand Down Expand Up @@ -150,7 +156,7 @@ public class NetheriteOrchestrationServiceSettings
public bool UseAlternateObjectStore { get; set; } = false;

/// <summary>
/// Forces steps to pe persisted before applying their effects, thus disabling all speculation.
/// Forces steps to pe persisted before applying their effects, disabling all pipelining.
/// </summary>
public bool PersistStepsFirst { get; set; } = false;

Expand Down
25 changes: 14 additions & 11 deletions src/DurableTask.Netherite/PartitionState/ActivitiesState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ public override string ToString()

void ScheduleNextOffloadDecision(TimeSpan delay)
{
this.Partition.PendingTimers.Schedule(DateTime.UtcNow + delay, new OffloadDecision()
if (this.Partition.Settings.ActivityScheduler != ActivitySchedulerOptions.Local)
{
PartitionId = this.Partition.PartitionId,
Timestamp = DateTime.UtcNow + delay,
});
this.Partition.PendingTimers.Schedule(DateTime.UtcNow + delay, new OffloadDecision()
{
PartitionId = this.Partition.PartitionId,
Timestamp = DateTime.UtcNow + delay,
});
}
}

public bool TryGetNextActivity(out ActivityInfo activityInfo)
Expand Down Expand Up @@ -292,7 +295,7 @@ public void Process(OffloadDecision offloadDecisionEvent, EffectTracker effects)

// we are adding (nonpersisted) information to the event just as a way of passing it to the OutboxState
offloadDecisionEvent.DestinationPartitionId = target;
offloadDecisionEvent.OffloadedActivities = new List<(TaskMessage,string)>();
offloadDecisionEvent.OffloadedActivities = new List<(TaskMessage, string)>();

for (int i = 0; i < maxBatchsize; i++)
{
Expand All @@ -314,10 +317,10 @@ public void Process(OffloadDecision offloadDecisionEvent, EffectTracker effects)
if (!effects.IsReplaying)
{
this.Partition.EventTraceHelper.TracePartitionOffloadDecision(this.EstimatedLocalWorkItemLoad, this.Pending.Count, this.LocalBacklog.Count, this.QueuedRemotes.Count, reportedRemotes);
}

// try again relatively soon
this.ScheduleNextOffloadDecision(TimeSpan.FromMilliseconds(200));
// try again relatively soon
this.ScheduleNextOffloadDecision(TimeSpan.FromMilliseconds(200));
}
}
else
{
Expand All @@ -326,10 +329,10 @@ public void Process(OffloadDecision offloadDecisionEvent, EffectTracker effects)
if (!effects.IsReplaying)
{
this.Partition.EventTraceHelper.TracePartitionOffloadDecision(this.EstimatedLocalWorkItemLoad, this.Pending.Count, this.LocalBacklog.Count, this.QueuedRemotes.Count, reportedRemotes);
}

// there are no eligible recipients... try again in a while
this.ScheduleNextOffloadDecision(TimeSpan.FromSeconds(10));
// there are no eligible recipients... try again in a while
this.ScheduleNextOffloadDecision(TimeSpan.FromSeconds(10));
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions test/PerformanceTests/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@
"PersistStepsFirst": false,

// set this to "Scripted" to control the scenario with a partition script
// or to "ClientOnly" to run only the client
"PartitionManagement": "EventProcessorHost",

// set this to "Local" to disable the global activity distribution algorithm
"ActivityScheduler": "PeriodicOffload",

// The log level limits below control the production of log events by the various components.
// it limits production, not just consumption, of the events, so it can be used to prevent overheads.
// "Debug" is a reasonable setting, as it allows troubleshooting without impacting perf too much.
Expand Down
7 changes: 4 additions & 3 deletions test/PerformanceTests/series/host.neth-12-ls.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@
"PersistStepsFirst": false,

// set this to "Scripted" to control the scenario with a partition script
// or to "ClientOnly" to run only the client
"PartitionManagement": "EventProcessorHost",

// set this to false to disable activity offloading
"DistributeActivities": false,
// set this to "Local" to disable the global activity distribution algorithm
"ActivityScheduler": "PeriodicOffload",

// The log level limits below control the production of log events by the various components.
// it limits production, not just consumption, of the events, so it can be used to prevent overheads.
// "Debug" is a reasonable setting, as it allows troubleshooting without impacting perf too much.
Expand Down

0 comments on commit 7af8787

Please sign in to comment.