Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise how dispose tasks are organized #330

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public interface IPartitionErrorHandler
CancellationToken Token { get; }

/// <summary>
/// A place to subscribe (potentially non-instantaneous) cleanup actions that execute on a dedicated thread.
/// Adds a task to be executed after the partition has been terminated.
/// </summary>
event Action OnShutdown;
void AddDisposeTask(string name, TimeSpan timeout, Action action);

/// <summary>
/// A boolean indicating whether the partition is terminated.
Expand All @@ -35,9 +35,11 @@ public interface IPartitionErrorHandler
bool NormalTermination { get; }

/// <summary>
/// Wait for all termination operations to finish
/// Waits for the dispose tasks to complete, up to the specified time limit.
/// </summary>
Task<bool> WaitForTermination(TimeSpan timeout);
/// <param name="timeout">The maximum time to wait for</param>
/// <returns>true if all tasks completed within the timeout, false otherwise.</returns>
bool WaitForDisposeTasks(TimeSpan timeout);

/// <summary>
/// Error handling for the partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -22,8 +23,7 @@ class PartitionErrorHandler : IPartitionErrorHandler
readonly string taskHub;
readonly TaskCompletionSource<object> shutdownComplete;
readonly TransportAbstraction.IHost host;

public event Action OnShutdown;
readonly List<Task> disposeTasks;

public CancellationToken Token
{
Expand All @@ -44,6 +44,11 @@ public CancellationToken Token

public bool NormalTermination => this.terminationStatus == TerminatedNormally;

public bool WaitForDisposeTasks(TimeSpan timeout)
{
return Task.WhenAll(this.disposeTasks).Wait(timeout);
}

volatile int terminationStatus = NotTerminated;
const int NotTerminated = 0;
const int TerminatedWithError = 1;
Expand All @@ -59,6 +64,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL
this.taskHub = taskHubName;
this.shutdownComplete = new TaskCompletionSource<object>();
this.host = host;
this.disposeTasks = new List<Task>();
}

public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
Expand Down Expand Up @@ -119,9 +125,8 @@ void Terminate()
{
try
{
this.logger?.LogDebug("Part{partition:D2} Started PartitionCancellation", this.partitionId);
// this immediately cancels all activities depending on the error handler token
this.cts.Cancel();
this.logger?.LogDebug("Part{partition:D2} Completed PartitionCancellation", this.partitionId);
}
catch (AggregateException aggregate)
{
Expand All @@ -134,47 +139,45 @@ void Terminate()
{
this.HandleError("PartitionErrorHandler.Terminate", "Exception in PartitionCancellation", e, false, true);
}
finally
{
// now that the partition is dead, run all the dispose tasks
Task.Run(this.DisposeAsync);
}
}

// we use a dedicated shutdown thread to help debugging and to contain damage if there are hangs
Thread shutdownThread = TrackedThreads.MakeTrackedThread(Shutdown, "PartitionShutdown");
shutdownThread.Start();

void Shutdown()
public void AddDisposeTask(string name, TimeSpan timeout, Action action)
{
this.disposeTasks.Add(new Task(() =>
{
Task disposeTask = Task.Run(action);
try
{
this.logger?.LogDebug("Part{partition:D2} Started PartitionShutdown", this.partitionId);

if (this.OnShutdown != null)
bool completedInTime = disposeTask.Wait(timeout);
if (!completedInTime)
{
this.OnShutdown();
}

this.cts.Dispose();

this.logger?.LogDebug("Part{partition:D2} Completed PartitionShutdown", this.partitionId);
}
catch (AggregateException aggregate)
{
foreach (var e in aggregate.InnerExceptions)
{
this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} timed out after {timeout}", null, false, false);
}
}
catch (Exception e)
catch(Exception exception)
{
this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} threw exception {exception}", null, false, false);
}

this.shutdownComplete.TrySetResult(null);
}
}));
}

public async Task<bool> WaitForTermination(TimeSpan timeout)
async Task DisposeAsync()
{
Task timeoutTask = Task.Delay(timeout);
var first = await Task.WhenAny(timeoutTask, this.shutdownComplete.Task);
return first == this.shutdownComplete.Task;
// execute all the dispose tasks in parallel
var tasks = this.disposeTasks;
foreach (var task in tasks)
{
task.Start();
}
await Task.WhenAll(tasks);

// we can now dispose the cancellation token source itself
this.cts.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -754,9 +754,11 @@ public async Task MaintenanceLoopAsync()
}
}

this.TraceHelper.FasterProgress("Blob manager is terminating partition normally");

this.PartitionErrorHandler.TerminateNormally();

this.TraceHelper.LeaseProgress("Blob manager stopped");
this.TraceHelper.FasterProgress("Blob manager stopped");
}

public async Task RemoveObsoleteCheckpoints()
Expand Down
71 changes: 33 additions & 38 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo
partition.Assert(this.fht.ReadCache == null, "Unexpected read cache");

this.terminationToken = partition.ErrorHandler.Token;
partition.ErrorHandler.OnShutdown += this.Shutdown;
partition.ErrorHandler.AddDisposeTask($"{nameof(FasterKV)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose);

this.compactionStopwatch = new Stopwatch();
this.compactionStopwatch.Start();
Expand All @@ -122,58 +122,53 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo
this.blobManager.TraceHelper.FasterProgress("Constructed FasterKV");
}

void Shutdown()
void Dispose()
{
try
this.TraceHelper.FasterProgress("Disposing CacheTracker");
this.cacheTracker?.Dispose();

foreach (var s in this.sessionsToDisposeOnShutdown)
{
this.TraceHelper.FasterProgress("Disposing CacheTracker");
this.cacheTracker?.Dispose();
this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session");
s.Dispose();
}

foreach (var s in this.sessionsToDisposeOnShutdown)
{
this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session");
s.Dispose();
}
this.TraceHelper.FasterProgress("Disposing Main Session");
try
{
this.mainSession?.Dispose();
}
catch (OperationCanceledException)
{
// can happen during shutdown
}

this.TraceHelper.FasterProgress("Disposing Main Session");
this.TraceHelper.FasterProgress("Disposing Query Sessions");
foreach (var s in this.querySessions)
{
try
{
this.mainSession?.Dispose();
s?.Dispose();
}
catch(OperationCanceledException)
catch (OperationCanceledException)
{
// can happen during shutdown
}
}

this.TraceHelper.FasterProgress("Disposing Query Sessions");
foreach (var s in this.querySessions)
{
try
{
s?.Dispose();
}
catch (OperationCanceledException)
{
// can happen during shutdown
}
}

this.TraceHelper.FasterProgress("Disposing FasterKV");
this.fht.Dispose();
this.TraceHelper.FasterProgress("Disposing FasterKV");
this.fht.Dispose();

this.TraceHelper.FasterProgress($"Disposing Devices");
this.blobManager.DisposeDevices();
this.TraceHelper.FasterProgress($"Disposing Devices");
this.blobManager.DisposeDevices();

if (this.blobManager.FaultInjector != null)
{
this.TraceHelper.FasterProgress($"Unregistering from FaultInjector");
this.blobManager.FaultInjector.Disposed(this.blobManager);
}
}
catch (Exception e)
if (this.blobManager.FaultInjector != null)
{
this.blobManager.TraceHelper.FasterStorageError("Disposing FasterKV", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we're no longer logging this exception. Where would it get logged now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See PartitionErrorHandler.AddDisposeTask.

It shows how dispose tasks are executed and logged.

this.TraceHelper.FasterProgress($"Unregistering from FaultInjector");
this.blobManager.FaultInjector.Disposed(this.blobManager);
}

this.TraceHelper.FasterProgress("Disposed FasterKV");
}

double GetElapsedCompactionMilliseconds()
Expand Down
23 changes: 9 additions & 14 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,19 @@ public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings
this.blobManager = blobManager;
var eventlogsettings = blobManager.GetEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
this.log = new FASTER.core.FasterLog(eventlogsettings);
blobManager.PartitionErrorHandler.OnShutdown += this.Shutdown;
blobManager.PartitionErrorHandler.AddDisposeTask($"{nameof(FasterLog)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose);
this.terminationToken = blobManager.PartitionErrorHandler.Token;
}
void Shutdown()

void Dispose()
{
try
{
this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog");
this.log.Dispose();
this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog");
this.log.Dispose();

this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device");
this.blobManager.EventLogDevice.Dispose();
}
catch (Exception e)
{
this.blobManager.TraceHelper.FasterStorageError("Disposing FasterLog", e);
}
this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device");
this.blobManager.EventLogDevice.Dispose();

this.blobManager.TraceHelper.FasterProgress("Disposed FasterLog");
}

public long BeginAddress => this.log.BeginAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async Task TerminationWrapper(Task what)
if (this.partition.Settings.TestHooks?.ReplayChecker == null)
{
this.hangCheckTimer = new Timer(this.CheckForStuckWorkers, null, 0, 20000);
errorHandler.OnShutdown += () => this.hangCheckTimer.Dispose();
errorHandler.AddDisposeTask("DisposeHangTimer", TimeSpan.FromSeconds(10), () => this.hangCheckTimer.Dispose());
}

bool hasCheckpoint = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,13 @@ async Task<PartitionIncarnation> StartPartitionAsync(PartitionIncarnation prior
await Task.Delay(addedDelay);
}

// we wait at most 20 seconds for the previous partition to terminate cleanly
int tries = 4;
var timeout = TimeSpan.FromSeconds(5);
// the previous incarnation has already been terminated. But it may not have cleaned up yet.
// We wait (but no more than 20 seconds) for the previous partition to dispose its assets
bool disposalComplete = prior.ErrorHandler.WaitForDisposeTasks(TimeSpan.FromSeconds(20));

while (!await prior.ErrorHandler.WaitForTermination(timeout))
if (!disposalComplete)
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) is still waiting for PartitionShutdown of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation);

if (--tries == 0)
{
break;
}
Comment on lines -191 to -194
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to no longer have this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, instead of waiting in a loop four times with 5-sec timeout it just waits once with a 20-sec timeout.

this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) timed out waiting for disposal of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ protected override async Task Process(IList<PartitionEvent> batch)
{
this.Partition = this.host.AddPartition(this.partitionId, this.sender);
var errorHandler = this.host.CreateErrorHandler(this.partitionId);
errorHandler.OnShutdown += () =>
errorHandler.AddDisposeTask("PartitionQueue.Termination", TimeSpan.FromSeconds(10), () =>
{
if (!this.isShuttingDown && this.testHooks?.FaultInjectionActive != true)
{
this.testHooks?.Error("MemoryTransport", "Unexpected partition termination");
}
this.Notify();
};
});

var (nextInputQueuePosition, _) = await this.Partition.CreateOrRestoreAsync(errorHandler, this.parameters, this.fingerPrint);

Expand Down