Skip to content

Commit

Permalink
revise how dispose tasks are organized so that they can all use the s…
Browse files Browse the repository at this point in the history
…ame timeout check and error reporting mechanism (#330)
  • Loading branch information
sebastianburckhardt authored Feb 13, 2024
1 parent 0abc16d commit 60c7d39
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public interface IPartitionErrorHandler
CancellationToken Token { get; }

/// <summary>
/// A place to subscribe (potentially non-instantaneous) cleanup actions that execute on a dedicated thread.
/// Adds a task to be executed after the partition has been terminated.
/// </summary>
event Action OnShutdown;
void AddDisposeTask(string name, TimeSpan timeout, Action action);

/// <summary>
/// A boolean indicating whether the partition is terminated.
Expand All @@ -35,9 +35,11 @@ public interface IPartitionErrorHandler
bool NormalTermination { get; }

/// <summary>
/// Wait for all termination operations to finish
/// Waits for the dispose tasks to complete, up to the specified time limit.
/// </summary>
Task<bool> WaitForTermination(TimeSpan timeout);
/// <param name="timeout">The maximum time to wait for</param>
/// <returns>true if all tasks completed within the timeout, false otherwise.</returns>
bool WaitForDisposeTasks(TimeSpan timeout);

/// <summary>
/// Error handling for the partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -22,8 +23,7 @@ class PartitionErrorHandler : IPartitionErrorHandler
readonly string taskHub;
readonly TaskCompletionSource<object> shutdownComplete;
readonly TransportAbstraction.IHost host;

public event Action OnShutdown;
readonly List<Task> disposeTasks;

public CancellationToken Token
{
Expand All @@ -44,6 +44,11 @@ public CancellationToken Token

public bool NormalTermination => this.terminationStatus == TerminatedNormally;

public bool WaitForDisposeTasks(TimeSpan timeout)
{
return Task.WhenAll(this.disposeTasks).Wait(timeout);
}

volatile int terminationStatus = NotTerminated;
const int NotTerminated = 0;
const int TerminatedWithError = 1;
Expand All @@ -59,6 +64,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL
this.taskHub = taskHubName;
this.shutdownComplete = new TaskCompletionSource<object>();
this.host = host;
this.disposeTasks = new List<Task>();
}

public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
Expand Down Expand Up @@ -119,9 +125,8 @@ void Terminate()
{
try
{
this.logger?.LogDebug("Part{partition:D2} Started PartitionCancellation", this.partitionId);
// this immediately cancels all activities depending on the error handler token
this.cts.Cancel();
this.logger?.LogDebug("Part{partition:D2} Completed PartitionCancellation", this.partitionId);
}
catch (AggregateException aggregate)
{
Expand All @@ -134,47 +139,45 @@ void Terminate()
{
this.HandleError("PartitionErrorHandler.Terminate", "Exception in PartitionCancellation", e, false, true);
}
finally
{
// now that the partition is dead, run all the dispose tasks
Task.Run(this.DisposeAsync);
}
}

// we use a dedicated shutdown thread to help debugging and to contain damage if there are hangs
Thread shutdownThread = TrackedThreads.MakeTrackedThread(Shutdown, "PartitionShutdown");
shutdownThread.Start();

void Shutdown()
public void AddDisposeTask(string name, TimeSpan timeout, Action action)
{
this.disposeTasks.Add(new Task(() =>
{
Task disposeTask = Task.Run(action);
try
{
this.logger?.LogDebug("Part{partition:D2} Started PartitionShutdown", this.partitionId);

if (this.OnShutdown != null)
bool completedInTime = disposeTask.Wait(timeout);
if (!completedInTime)
{
this.OnShutdown();
}

this.cts.Dispose();

this.logger?.LogDebug("Part{partition:D2} Completed PartitionShutdown", this.partitionId);
}
catch (AggregateException aggregate)
{
foreach (var e in aggregate.InnerExceptions)
{
this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} timed out after {timeout}", null, false, false);
}
}
catch (Exception e)
catch(Exception exception)
{
this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} threw exception {exception}", null, false, false);
}

this.shutdownComplete.TrySetResult(null);
}
}));
}

public async Task<bool> WaitForTermination(TimeSpan timeout)
async Task DisposeAsync()
{
Task timeoutTask = Task.Delay(timeout);
var first = await Task.WhenAny(timeoutTask, this.shutdownComplete.Task);
return first == this.shutdownComplete.Task;
// execute all the dispose tasks in parallel
var tasks = this.disposeTasks;
foreach (var task in tasks)
{
task.Start();
}
await Task.WhenAll(tasks);

// we can now dispose the cancellation token source itself
this.cts.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,11 @@ public async Task MaintenanceLoopAsync()
}
}

this.TraceHelper.FasterProgress("Blob manager is terminating partition normally");

this.PartitionErrorHandler.TerminateNormally();

this.TraceHelper.LeaseProgress("Blob manager stopped");
this.TraceHelper.FasterProgress("Blob manager stopped");
}

public async Task RemoveObsoleteCheckpoints()
Expand Down
71 changes: 33 additions & 38 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo
partition.Assert(this.fht.ReadCache == null, "Unexpected read cache");

this.terminationToken = partition.ErrorHandler.Token;
partition.ErrorHandler.OnShutdown += this.Shutdown;
partition.ErrorHandler.AddDisposeTask($"{nameof(FasterKV)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose);

this.compactionStopwatch = new Stopwatch();
this.compactionStopwatch.Start();
Expand All @@ -123,58 +123,53 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo
this.blobManager.TraceHelper.FasterProgress("Constructed FasterKV");
}

void Shutdown()
void Dispose()
{
try
this.TraceHelper.FasterProgress("Disposing CacheTracker");
this.cacheTracker?.Dispose();

foreach (var s in this.sessionsToDisposeOnShutdown)
{
this.TraceHelper.FasterProgress("Disposing CacheTracker");
this.cacheTracker?.Dispose();
this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session");
s.Dispose();
}

foreach (var s in this.sessionsToDisposeOnShutdown)
{
this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session");
s.Dispose();
}
this.TraceHelper.FasterProgress("Disposing Main Session");
try
{
this.mainSession?.Dispose();
}
catch (OperationCanceledException)
{
// can happen during shutdown
}

this.TraceHelper.FasterProgress("Disposing Main Session");
this.TraceHelper.FasterProgress("Disposing Query Sessions");
foreach (var s in this.querySessions)
{
try
{
this.mainSession?.Dispose();
s?.Dispose();
}
catch(OperationCanceledException)
catch (OperationCanceledException)
{
// can happen during shutdown
}
}

this.TraceHelper.FasterProgress("Disposing Query Sessions");
foreach (var s in this.querySessions)
{
try
{
s?.Dispose();
}
catch (OperationCanceledException)
{
// can happen during shutdown
}
}

this.TraceHelper.FasterProgress("Disposing FasterKV");
this.fht.Dispose();
this.TraceHelper.FasterProgress("Disposing FasterKV");
this.fht.Dispose();

this.TraceHelper.FasterProgress($"Disposing Devices");
this.blobManager.DisposeDevices();
this.TraceHelper.FasterProgress($"Disposing Devices");
this.blobManager.DisposeDevices();

if (this.blobManager.FaultInjector != null)
{
this.TraceHelper.FasterProgress($"Unregistering from FaultInjector");
this.blobManager.FaultInjector.Disposed(this.blobManager);
}
}
catch (Exception e)
if (this.blobManager.FaultInjector != null)
{
this.blobManager.TraceHelper.FasterStorageError("Disposing FasterKV", e);
this.TraceHelper.FasterProgress($"Unregistering from FaultInjector");
this.blobManager.FaultInjector.Disposed(this.blobManager);
}

this.TraceHelper.FasterProgress("Disposed FasterKV");
}

double GetElapsedCompactionMilliseconds()
Expand Down
23 changes: 9 additions & 14 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,19 @@ public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings
this.blobManager = blobManager;
var eventlogsettings = blobManager.GetEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
this.log = new FASTER.core.FasterLog(eventlogsettings);
blobManager.PartitionErrorHandler.OnShutdown += this.Shutdown;
blobManager.PartitionErrorHandler.AddDisposeTask($"{nameof(FasterLog)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose);
this.terminationToken = blobManager.PartitionErrorHandler.Token;
}
void Shutdown()

void Dispose()
{
try
{
this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog");
this.log.Dispose();
this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog");
this.log.Dispose();

this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device");
this.blobManager.EventLogDevice.Dispose();
}
catch (Exception e)
{
this.blobManager.TraceHelper.FasterStorageError("Disposing FasterLog", e);
}
this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device");
this.blobManager.EventLogDevice.Dispose();

this.blobManager.TraceHelper.FasterProgress("Disposed FasterLog");
}

public long BeginAddress => this.log.BeginAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async Task TerminationWrapper(Task what)
if (this.partition.Settings.TestHooks?.ReplayChecker == null)
{
this.hangCheckTimer = new Timer(this.CheckForStuckWorkers, null, 0, 20000);
errorHandler.OnShutdown += () => this.hangCheckTimer.Dispose();
errorHandler.AddDisposeTask("DisposeHangTimer", TimeSpan.FromSeconds(10), () => this.hangCheckTimer.Dispose());
}

bool hasCheckpoint = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,13 @@ async Task<PartitionIncarnation> StartPartitionAsync(PartitionIncarnation prior
await Task.Delay(addedDelay);
}

// we wait at most 20 seconds for the previous partition to terminate cleanly
int tries = 4;
var timeout = TimeSpan.FromSeconds(5);
// the previous incarnation has already been terminated. But it may not have cleaned up yet.
// We wait (but no more than 20 seconds) for the previous partition to dispose its assets
bool disposalComplete = prior.ErrorHandler.WaitForDisposeTasks(TimeSpan.FromSeconds(20));

while (!await prior.ErrorHandler.WaitForTermination(timeout))
if (!disposalComplete)
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) is still waiting for PartitionShutdown of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation);

if (--tries == 0)
{
break;
}
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) timed out waiting for disposal of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ protected override async Task Process(IList<PartitionEvent> batch)
{
this.Partition = this.host.AddPartition(this.partitionId, this.sender);
var errorHandler = this.host.CreateErrorHandler(this.partitionId);
errorHandler.OnShutdown += () =>
errorHandler.AddDisposeTask("PartitionQueue.Termination", TimeSpan.FromSeconds(10), () =>
{
if (!this.isShuttingDown && this.testHooks?.FaultInjectionActive != true)
{
this.testHooks?.Error("MemoryTransport", "Unexpected partition termination");
}
this.Notify();
};
});

var (nextInputQueuePosition, _) = await this.Partition.CreateOrRestoreAsync(errorHandler, this.parameters, this.fingerPrint);

Expand Down

0 comments on commit 60c7d39

Please sign in to comment.