diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs
index beaed34f..95ef5ab9 100644
--- a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs
+++ b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs
@@ -20,9 +20,9 @@ public interface IPartitionErrorHandler
CancellationToken Token { get; }
///
- /// A place to subscribe (potentially non-instantaneous) cleanup actions that execute on a dedicated thread.
+ /// Adds a task to be executed after the partition has been terminated.
///
- event Action OnShutdown;
+ void AddDisposeTask(string name, TimeSpan timeout, Action action);
///
/// A boolean indicating whether the partition is terminated.
@@ -35,9 +35,11 @@ public interface IPartitionErrorHandler
bool NormalTermination { get; }
///
- /// Wait for all termination operations to finish
+ /// Waits for the dispose tasks to complete, up to the specified time limit.
///
- Task WaitForTermination(TimeSpan timeout);
+ /// The maximum time to wait for
+ /// true if all tasks completed within the timeout, false otherwise.
+ bool WaitForDisposeTasks(TimeSpan timeout);
///
/// Error handling for the partition.
diff --git a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs
index 7d5a74de..e4cf00db 100644
--- a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs
@@ -4,6 +4,7 @@
namespace DurableTask.Netherite
{
using System;
+ using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
@@ -22,8 +23,7 @@ class PartitionErrorHandler : IPartitionErrorHandler
readonly string taskHub;
readonly TaskCompletionSource shutdownComplete;
readonly TransportAbstraction.IHost host;
-
- public event Action OnShutdown;
+ readonly List disposeTasks;
public CancellationToken Token
{
@@ -44,6 +44,11 @@ public CancellationToken Token
public bool NormalTermination => this.terminationStatus == TerminatedNormally;
+ public bool WaitForDisposeTasks(TimeSpan timeout)
+ {
+ return Task.WhenAll(this.disposeTasks).Wait(timeout);
+ }
+
volatile int terminationStatus = NotTerminated;
const int NotTerminated = 0;
const int TerminatedWithError = 1;
@@ -59,6 +64,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL
this.taskHub = taskHubName;
this.shutdownComplete = new TaskCompletionSource();
this.host = host;
+ this.disposeTasks = new List();
}
public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
@@ -119,9 +125,8 @@ void Terminate()
{
try
{
- this.logger?.LogDebug("Part{partition:D2} Started PartitionCancellation", this.partitionId);
+ // this immediately cancels all activities depending on the error handler token
this.cts.Cancel();
- this.logger?.LogDebug("Part{partition:D2} Completed PartitionCancellation", this.partitionId);
}
catch (AggregateException aggregate)
{
@@ -134,47 +139,45 @@ void Terminate()
{
this.HandleError("PartitionErrorHandler.Terminate", "Exception in PartitionCancellation", e, false, true);
}
+ finally
+ {
+ // now that the partition is dead, run all the dispose tasks
+ Task.Run(this.DisposeAsync);
+ }
+ }
- // we use a dedicated shutdown thread to help debugging and to contain damage if there are hangs
- Thread shutdownThread = TrackedThreads.MakeTrackedThread(Shutdown, "PartitionShutdown");
- shutdownThread.Start();
-
- void Shutdown()
+ public void AddDisposeTask(string name, TimeSpan timeout, Action action)
+ {
+ this.disposeTasks.Add(new Task(() =>
{
+ Task disposeTask = Task.Run(action);
try
{
- this.logger?.LogDebug("Part{partition:D2} Started PartitionShutdown", this.partitionId);
-
- if (this.OnShutdown != null)
+ bool completedInTime = disposeTask.Wait(timeout);
+ if (!completedInTime)
{
- this.OnShutdown();
- }
-
- this.cts.Dispose();
-
- this.logger?.LogDebug("Part{partition:D2} Completed PartitionShutdown", this.partitionId);
- }
- catch (AggregateException aggregate)
- {
- foreach (var e in aggregate.InnerExceptions)
- {
- this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
+ this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} timed out after {timeout}", null, false, false);
}
}
- catch (Exception e)
+ catch(Exception exception)
{
- this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true);
+ this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} threw exception {exception}", null, false, false);
}
-
- this.shutdownComplete.TrySetResult(null);
- }
+ }));
}
- public async Task WaitForTermination(TimeSpan timeout)
+ async Task DisposeAsync()
{
- Task timeoutTask = Task.Delay(timeout);
- var first = await Task.WhenAny(timeoutTask, this.shutdownComplete.Task);
- return first == this.shutdownComplete.Task;
+ // execute all the dispose tasks in parallel
+ var tasks = this.disposeTasks;
+ foreach (var task in tasks)
+ {
+ task.Start();
+ }
+ await Task.WhenAll(tasks);
+
+ // we can now dispose the cancellation token source itself
+ this.cts.Dispose();
}
}
}
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
index f422d682..453c4911 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs
@@ -755,9 +755,11 @@ public async Task MaintenanceLoopAsync()
}
}
+ this.TraceHelper.FasterProgress("Blob manager is terminating partition normally");
+
this.PartitionErrorHandler.TerminateNormally();
- this.TraceHelper.LeaseProgress("Blob manager stopped");
+ this.TraceHelper.FasterProgress("Blob manager stopped");
}
public async Task RemoveObsoleteCheckpoints()
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
index 71aa30b9..135dd4f4 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
@@ -113,7 +113,7 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo
partition.Assert(this.fht.ReadCache == null, "Unexpected read cache");
this.terminationToken = partition.ErrorHandler.Token;
- partition.ErrorHandler.OnShutdown += this.Shutdown;
+ partition.ErrorHandler.AddDisposeTask($"{nameof(FasterKV)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose);
this.compactionStopwatch = new Stopwatch();
this.compactionStopwatch.Start();
@@ -123,58 +123,53 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo
this.blobManager.TraceHelper.FasterProgress("Constructed FasterKV");
}
- void Shutdown()
+ void Dispose()
{
- try
+ this.TraceHelper.FasterProgress("Disposing CacheTracker");
+ this.cacheTracker?.Dispose();
+
+ foreach (var s in this.sessionsToDisposeOnShutdown)
{
- this.TraceHelper.FasterProgress("Disposing CacheTracker");
- this.cacheTracker?.Dispose();
+ this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session");
+ s.Dispose();
+ }
- foreach (var s in this.sessionsToDisposeOnShutdown)
- {
- this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session");
- s.Dispose();
- }
+ this.TraceHelper.FasterProgress("Disposing Main Session");
+ try
+ {
+ this.mainSession?.Dispose();
+ }
+ catch (OperationCanceledException)
+ {
+ // can happen during shutdown
+ }
- this.TraceHelper.FasterProgress("Disposing Main Session");
+ this.TraceHelper.FasterProgress("Disposing Query Sessions");
+ foreach (var s in this.querySessions)
+ {
try
{
- this.mainSession?.Dispose();
+ s?.Dispose();
}
- catch(OperationCanceledException)
+ catch (OperationCanceledException)
{
// can happen during shutdown
}
+ }
- this.TraceHelper.FasterProgress("Disposing Query Sessions");
- foreach (var s in this.querySessions)
- {
- try
- {
- s?.Dispose();
- }
- catch (OperationCanceledException)
- {
- // can happen during shutdown
- }
- }
-
- this.TraceHelper.FasterProgress("Disposing FasterKV");
- this.fht.Dispose();
+ this.TraceHelper.FasterProgress("Disposing FasterKV");
+ this.fht.Dispose();
- this.TraceHelper.FasterProgress($"Disposing Devices");
- this.blobManager.DisposeDevices();
+ this.TraceHelper.FasterProgress($"Disposing Devices");
+ this.blobManager.DisposeDevices();
- if (this.blobManager.FaultInjector != null)
- {
- this.TraceHelper.FasterProgress($"Unregistering from FaultInjector");
- this.blobManager.FaultInjector.Disposed(this.blobManager);
- }
- }
- catch (Exception e)
+ if (this.blobManager.FaultInjector != null)
{
- this.blobManager.TraceHelper.FasterStorageError("Disposing FasterKV", e);
+ this.TraceHelper.FasterProgress($"Unregistering from FaultInjector");
+ this.blobManager.FaultInjector.Disposed(this.blobManager);
}
+
+ this.TraceHelper.FasterProgress("Disposed FasterKV");
}
double GetElapsedCompactionMilliseconds()
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs
index 20949733..cad4e5a2 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs
@@ -20,24 +20,19 @@ public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings
this.blobManager = blobManager;
var eventlogsettings = blobManager.GetEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters);
this.log = new FASTER.core.FasterLog(eventlogsettings);
- blobManager.PartitionErrorHandler.OnShutdown += this.Shutdown;
+ blobManager.PartitionErrorHandler.AddDisposeTask($"{nameof(FasterLog)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose);
this.terminationToken = blobManager.PartitionErrorHandler.Token;
}
-
- void Shutdown()
+
+ void Dispose()
{
- try
- {
- this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog");
- this.log.Dispose();
+ this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog");
+ this.log.Dispose();
- this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device");
- this.blobManager.EventLogDevice.Dispose();
- }
- catch (Exception e)
- {
- this.blobManager.TraceHelper.FasterStorageError("Disposing FasterLog", e);
- }
+ this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device");
+ this.blobManager.EventLogDevice.Dispose();
+
+ this.blobManager.TraceHelper.FasterProgress("Disposed FasterLog");
}
public long BeginAddress => this.log.BeginAddress;
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
index 18dddf85..f94e465a 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
@@ -115,7 +115,7 @@ async Task TerminationWrapper(Task what)
if (this.partition.Settings.TestHooks?.ReplayChecker == null)
{
this.hangCheckTimer = new Timer(this.CheckForStuckWorkers, null, 0, 20000);
- errorHandler.OnShutdown += () => this.hangCheckTimer.Dispose();
+ errorHandler.AddDisposeTask("DisposeHangTimer", TimeSpan.FromSeconds(10), () => this.hangCheckTimer.Dispose());
}
bool hasCheckpoint = false;
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs
index a5873f26..6c451995 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs
@@ -180,18 +180,13 @@ async Task StartPartitionAsync(PartitionIncarnation prior
await Task.Delay(addedDelay);
}
- // we wait at most 20 seconds for the previous partition to terminate cleanly
- int tries = 4;
- var timeout = TimeSpan.FromSeconds(5);
+ // the previous incarnation has already been terminated. But it may not have cleaned up yet.
+ // We wait (but no more than 20 seconds) for the previous partition to dispose its assets
+ bool disposalComplete = prior.ErrorHandler.WaitForDisposeTasks(TimeSpan.FromSeconds(20));
- while (!await prior.ErrorHandler.WaitForTermination(timeout))
+ if (!disposalComplete)
{
- this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) is still waiting for PartitionShutdown of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation);
-
- if (--tries == 0)
- {
- break;
- }
+ this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) timed out waiting for disposal of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation);
}
}
}
diff --git a/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs b/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs
index 22eb20a2..e8ed541e 100644
--- a/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs
+++ b/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs
@@ -78,14 +78,14 @@ protected override async Task Process(IList batch)
{
this.Partition = this.host.AddPartition(this.partitionId, this.sender);
var errorHandler = this.host.CreateErrorHandler(this.partitionId);
- errorHandler.OnShutdown += () =>
+ errorHandler.AddDisposeTask("PartitionQueue.Termination", TimeSpan.FromSeconds(10), () =>
{
if (!this.isShuttingDown && this.testHooks?.FaultInjectionActive != true)
{
this.testHooks?.Error("MemoryTransport", "Unexpected partition termination");
}
this.Notify();
- };
+ });
var (nextInputQueuePosition, _) = await this.Partition.CreateOrRestoreAsync(errorHandler, this.parameters, this.fingerPrint);