Skip to content

Commit

Permalink
Merge pull request #353 from microsoft/dev
Browse files Browse the repository at this point in the history
Promote dev to main for 1.4.2 release
  • Loading branch information
davidmrdavid authored Feb 16, 2024
2 parents f24f6de + 60c7d39 commit 7921c1e
Show file tree
Hide file tree
Showing 21 changed files with 385 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public interface IPartitionErrorHandler
CancellationToken Token { get; }

/// <summary>
/// A place to subscribe (potentially non-instantaneous) cleanup actions that execute on a dedicated thread.
/// Adds a task to be executed after the partition has been terminated.
/// </summary>
event Action OnShutdown;
void AddDisposeTask(string name, TimeSpan timeout, Action action);

/// <summary>
/// A boolean indicating whether the partition is terminated.
Expand All @@ -35,9 +35,11 @@ public interface IPartitionErrorHandler
bool NormalTermination { get; }

/// <summary>
/// Wait for all termination operations to finish
/// Waits for the dispose tasks to complete, up to the specified time limit.
/// </summary>
Task<bool> WaitForTermination(TimeSpan timeout);
/// <param name="timeout">The maximum time to wait for</param>
/// <returns>true if all tasks completed within the timeout, false otherwise.</returns>
bool WaitForDisposeTasks(TimeSpan timeout);

/// <summary>
/// Error handling for the partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ namespace DurableTask.Netherite
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;

/// <summary>
/// Represents a key used to identify <see cref="TrackedObject"/> instances.
/// </summary>
struct TrackedObjectKey
struct TrackedObjectKey
{
public TrackedObjectType ObjectType;
public string InstanceId;
Expand Down Expand Up @@ -70,12 +71,12 @@ public static int Compare(ref TrackedObjectKey key1, ref TrackedObjectKey key2)

public class Comparer : IComparer<TrackedObjectKey>
{
public int Compare(TrackedObjectKey x, TrackedObjectKey y) => TrackedObjectKey.Compare(ref x, ref y);
public int Compare(TrackedObjectKey x, TrackedObjectKey y) => TrackedObjectKey.Compare(ref x, ref y);
}

public override int GetHashCode()
{
return (this.InstanceId?.GetHashCode() ?? 0) + (int) this.ObjectType;
return (this.InstanceId?.GetHashCode() ?? 0) + (int)this.ObjectType;
}

public override bool Equals(object obj)
Expand All @@ -97,55 +98,37 @@ public override bool Equals(object obj)

// convenient constructors for non-singletons

public static TrackedObjectKey History(string id) => new TrackedObjectKey()
{
ObjectType = TrackedObjectType.History,
InstanceId = id,
};
public static TrackedObjectKey Instance(string id) => new TrackedObjectKey()
{
ObjectType = TrackedObjectType.Instance,
InstanceId = id,
};
public static TrackedObjectKey History(string id) => new TrackedObjectKey()
{
ObjectType = TrackedObjectType.History,
InstanceId = id,
};
public static TrackedObjectKey Instance(string id) => new TrackedObjectKey()
{
ObjectType = TrackedObjectType.Instance,
InstanceId = id,
};

public static TrackedObject Factory(TrackedObjectKey key) => key.ObjectType switch
{
TrackedObjectType.Activities => new ActivitiesState(),
TrackedObjectType.Dedup => new DedupState(),
TrackedObjectType.Outbox => new OutboxState(),
TrackedObjectType.Reassembly => new ReassemblyState(),
TrackedObjectType.Sessions => new SessionsState(),
TrackedObjectType.Timers => new TimersState(),
TrackedObjectType.Prefetch => new PrefetchState(),
TrackedObjectType.Queries => new QueriesState(),
TrackedObjectType.Stats => new StatsState(),
TrackedObjectType.History => new HistoryState() { InstanceId = key.InstanceId },
TrackedObjectType.Instance => new InstanceState() { InstanceId = key.InstanceId },
_ => throw new ArgumentException("invalid key", nameof(key)),
};

public static IEnumerable<TrackedObjectKey> GetSingletons()
{
TrackedObjectType.Activities => new ActivitiesState(),
TrackedObjectType.Dedup => new DedupState(),
TrackedObjectType.Outbox => new OutboxState(),
TrackedObjectType.Reassembly => new ReassemblyState(),
TrackedObjectType.Sessions => new SessionsState(),
TrackedObjectType.Timers => new TimersState(),
TrackedObjectType.Prefetch => new PrefetchState(),
TrackedObjectType.Queries => new QueriesState(),
TrackedObjectType.Stats => new StatsState(),
TrackedObjectType.History => new HistoryState() { InstanceId = key.InstanceId },
TrackedObjectType.Instance => new InstanceState() { InstanceId = key.InstanceId },
_ => throw new ArgumentException("invalid key", nameof(key)),
};

public static IEnumerable<TrackedObjectKey> GetSingletons()
=> Enum.GetValues(typeof(TrackedObjectType)).Cast<TrackedObjectType>().Where(t => IsSingletonType(t)).Select(t => new TrackedObjectKey() { ObjectType = t });

public override string ToString()
public override string ToString()
=> this.InstanceId == null ? this.ObjectType.ToString() : $"{this.ObjectType}-{this.InstanceId}";

public void Deserialize(BinaryReader reader)
{
this.ObjectType = (TrackedObjectType) reader.ReadByte();
if (!IsSingletonType(this.ObjectType))
{
this.InstanceId = reader.ReadString();
}
}

public void Serialize(BinaryWriter writer)
{
writer.Write((byte) this.ObjectType);
if (!IsSingletonType(this.ObjectType))
{
writer.Write(this.InstanceId);
}
}
}
}
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
Expand Down Expand Up @@ -56,7 +56,7 @@
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.16" />
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.23" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.15.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ public class NetheriteOrchestrationServiceSettings

/// <summary>
/// Whether to checkpoint the current state of a partition when it is stopped. This improves recovery time but
/// lengthens shutdown time.
/// lengthens shutdown time and can cause memory pressure if many partitions are stopped at the same time,
/// for example if a host is shutting down.
/// </summary>
public bool TakeStateCheckpointWhenStoppingPartition { get; set; } = true;
public bool TakeStateCheckpointWhenStoppingPartition { get; set; } = false;

/// <summary>
/// A limit on how many bytes to append to the log before initiating a state checkpoint. The default is 20MB.
Expand Down Expand Up @@ -168,6 +169,11 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public int PackPartitionTaskMessages { get; set; } = 100;

/// <summary>
/// Time limit for partition startup, in minutes.
/// </summary>
public int PartitionStartupTimeoutMinutes { get; set; } = 15;

/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
Expand Down
42 changes: 26 additions & 16 deletions src/DurableTask.Netherite/OrchestrationService/Partition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ partial class Partition : TransportAbstraction.IPartition
// A little helper property that allows us to conventiently check the condition for low-level event tracing
public EventTraceHelper EventDetailTracer => this.EventTraceHelper.IsTracingAtMostDetailedLevel ? this.EventTraceHelper : null;

static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(5);
// We use this semaphore to limit how many partitions can be starting up at the same time on the same host
// because starting up a partition may temporarily consume a lot of CPU, I/O, and memory
const int ConcurrentStartsLimit = 5;
static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(ConcurrentStartsLimit);

public Partition(
NetheriteOrchestrationService host,
Expand Down Expand Up @@ -93,9 +96,6 @@ public Partition(
EventTraceContext.Clear();

this.ErrorHandler = errorHandler;

this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, "");

errorHandler.Token.Register(() =>
{
this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, "");
Expand All @@ -108,26 +108,36 @@ public Partition(
}

}, useSynchronizationContext: false);


// before we start the partition, we have to acquire the MaxConcurrentStarts semaphore
// (to prevent a host from being overwhelmed by the simultaneous start of too many partitions)
this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, $"max={ConcurrentStartsLimit} available={MaxConcurrentStarts.CurrentCount}");
await MaxConcurrentStarts.WaitAsync();

// create or restore partition state from last snapshot
try
{
// create the state
this.State = ((TransportAbstraction.IHost) this.host).StorageLayer.CreatePartitionState(parameters);
this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, "");

// initialize timer for this partition
this.PendingTimers = new BatchTimer<PartitionEvent>(this.ErrorHandler.Token, this.TimersFired);
(long, int) inputQueuePosition;

// goes to storage to create or restore the partition state
var inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false);
await using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes)))
{
// create or restore partition state from last snapshot
// create the state
this.State = ((TransportAbstraction.IHost)this.host).StorageLayer.CreatePartitionState(parameters);

// start processing the timers
this.PendingTimers.Start($"Timer{this.PartitionId:D2}");
// initialize timer for this partition
this.PendingTimers = new BatchTimer<PartitionEvent>(this.ErrorHandler.Token, this.TimersFired);

// start processing the worker queues
this.State.StartProcessing();
// goes to storage to create or restore the partition state
inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false);

// start processing the timers
this.PendingTimers.Start($"Timer{this.PartitionId:D2}");

// start processing the worker queues
this.State.StartProcessing();
}

this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition.Item1}.{inputQueuePosition.Item2}");
return inputQueuePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -22,8 +23,7 @@ class PartitionErrorHandler : IPartitionErrorHandler
readonly string taskHub;
readonly TaskCompletionSource<object> shutdownComplete;
readonly TransportAbstraction.IHost host;

public event Action OnShutdown;
readonly List<Task> disposeTasks;

public CancellationToken Token
{
Expand All @@ -44,6 +44,11 @@ public CancellationToken Token

public bool NormalTermination => this.terminationStatus == TerminatedNormally;

public bool WaitForDisposeTasks(TimeSpan timeout)
{
return Task.WhenAll(this.disposeTasks).Wait(timeout);
}

volatile int terminationStatus = NotTerminated;
const int NotTerminated = 0;
const int TerminatedWithError = 1;
Expand All @@ -59,6 +64,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL
this.taskHub = taskHubName;
this.shutdownComplete = new TaskCompletionSource<object>();
this.host = host;
this.disposeTasks = new List<Task>();
}

public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
Expand Down Expand Up @@ -119,9 +125,8 @@ void Terminate()
{
try
{
this.logger?.LogDebug("Part{partition:D2} Started PartitionCancellation", this.partitionId);
// this immediately cancels all activities depending on the error handler token
this.cts.Cancel();
this.logger?.LogDebug("Part{partition:D2} Completed PartitionCancellation", this.partitionId);
}
catch (AggregateException aggregate)
{
Expand All @@ -134,47 +139,45 @@ void Terminate()
{
this.HandleError("PartitionErrorHandler.Terminate", "Exception in PartitionCancellation", e, false, true);
}
finally
{
// now that the partition is dead, run all the dispose tasks
Task.Run(this.DisposeAsync);
}
}

// we use a dedicated shutdown thread to help debugging and to contain damage if there are hangs
Thread shutdownThread = TrackedThreads.MakeTrackedThread(Shutdown, "PartitionShutdown");
shutdownThread.Start();

void Shutdown()
public void AddDisposeTask(string name, TimeSpan timeout, Action action)
{
this.disposeTasks.Add(new Task(() =>
{
Task disposeTask = Task.Run(action);
try
{
this.logger?.LogDebug("Part{partition:D2} Started PartitionShutdown", this.partitionId);

if (this.OnShutdown != null)
bool completedInTime = disposeTask.Wait(timeout);
if (!completedInTime)
{
this.OnShutdown();
}

this.cts.Dispose();

this.logger?.LogDebug("Part{partition:D2} Completed PartitionShutdown", this.partitionId);
}
catch (AggregateException aggregate)
{
foreach (var e in aggregate.InnerExceptions)
{
this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} timed out after {timeout}", null, false, false);
}
}
catch (Exception e)
catch(Exception exception)
{
this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} threw exception {exception}", null, false, false);
}

this.shutdownComplete.TrySetResult(null);
}
}));
}

public async Task<bool> WaitForTermination(TimeSpan timeout)
async Task DisposeAsync()
{
Task timeoutTask = Task.Delay(timeout);
var first = await Task.WhenAny(timeoutTask, this.shutdownComplete.Task);
return first == this.shutdownComplete.Task;
// execute all the dispose tasks in parallel
var tasks = this.disposeTasks;
foreach (var task in tasks)
{
task.Start();
}
await Task.WhenAll(tasks);

// we can now dispose the cancellation token source itself
this.cts.Dispose();
}
}
}
Loading

0 comments on commit 7921c1e

Please sign in to comment.