diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index b6d01aba..9d2edf93 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -26,7 +26,7 @@ 1 4 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs index beaed34f..95ef5ab9 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionErrorHandler.cs @@ -20,9 +20,9 @@ public interface IPartitionErrorHandler CancellationToken Token { get; } /// - /// A place to subscribe (potentially non-instantaneous) cleanup actions that execute on a dedicated thread. + /// Adds a task to be executed after the partition has been terminated. /// - event Action OnShutdown; + void AddDisposeTask(string name, TimeSpan timeout, Action action); /// /// A boolean indicating whether the partition is terminated. @@ -35,9 +35,11 @@ public interface IPartitionErrorHandler bool NormalTermination { get; } /// - /// Wait for all termination operations to finish + /// Waits for the dispose tasks to complete, up to the specified time limit. /// - Task WaitForTermination(TimeSpan timeout); + /// The maximum time to wait for + /// true if all tasks completed within the timeout, false otherwise. + bool WaitForDisposeTasks(TimeSpan timeout); /// /// Error handling for the partition. diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs index a819223e..324e0a85 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs @@ -7,11 +7,12 @@ namespace DurableTask.Netherite using System.Collections.Generic; using System.IO; using System.Linq; + using System.Runtime.Serialization; /// /// Represents a key used to identify instances. /// - struct TrackedObjectKey + struct TrackedObjectKey { public TrackedObjectType ObjectType; public string InstanceId; @@ -70,12 +71,12 @@ public static int Compare(ref TrackedObjectKey key1, ref TrackedObjectKey key2) public class Comparer : IComparer { - public int Compare(TrackedObjectKey x, TrackedObjectKey y) => TrackedObjectKey.Compare(ref x, ref y); + public int Compare(TrackedObjectKey x, TrackedObjectKey y) => TrackedObjectKey.Compare(ref x, ref y); } public override int GetHashCode() { - return (this.InstanceId?.GetHashCode() ?? 0) + (int) this.ObjectType; + return (this.InstanceId?.GetHashCode() ?? 0) + (int)this.ObjectType; } public override bool Equals(object obj) @@ -97,55 +98,37 @@ public override bool Equals(object obj) // convenient constructors for non-singletons - public static TrackedObjectKey History(string id) => new TrackedObjectKey() - { - ObjectType = TrackedObjectType.History, - InstanceId = id, - }; - public static TrackedObjectKey Instance(string id) => new TrackedObjectKey() - { - ObjectType = TrackedObjectType.Instance, - InstanceId = id, - }; + public static TrackedObjectKey History(string id) => new TrackedObjectKey() + { + ObjectType = TrackedObjectType.History, + InstanceId = id, + }; + public static TrackedObjectKey Instance(string id) => new TrackedObjectKey() + { + ObjectType = TrackedObjectType.Instance, + InstanceId = id, + }; public static TrackedObject Factory(TrackedObjectKey key) => key.ObjectType switch - { - TrackedObjectType.Activities => new ActivitiesState(), - TrackedObjectType.Dedup => new DedupState(), - TrackedObjectType.Outbox => new OutboxState(), - TrackedObjectType.Reassembly => new ReassemblyState(), - TrackedObjectType.Sessions => new SessionsState(), - TrackedObjectType.Timers => new TimersState(), - TrackedObjectType.Prefetch => new PrefetchState(), - TrackedObjectType.Queries => new QueriesState(), - TrackedObjectType.Stats => new StatsState(), - TrackedObjectType.History => new HistoryState() { InstanceId = key.InstanceId }, - TrackedObjectType.Instance => new InstanceState() { InstanceId = key.InstanceId }, - _ => throw new ArgumentException("invalid key", nameof(key)), - }; - - public static IEnumerable GetSingletons() + { + TrackedObjectType.Activities => new ActivitiesState(), + TrackedObjectType.Dedup => new DedupState(), + TrackedObjectType.Outbox => new OutboxState(), + TrackedObjectType.Reassembly => new ReassemblyState(), + TrackedObjectType.Sessions => new SessionsState(), + TrackedObjectType.Timers => new TimersState(), + TrackedObjectType.Prefetch => new PrefetchState(), + TrackedObjectType.Queries => new QueriesState(), + TrackedObjectType.Stats => new StatsState(), + TrackedObjectType.History => new HistoryState() { InstanceId = key.InstanceId }, + TrackedObjectType.Instance => new InstanceState() { InstanceId = key.InstanceId }, + _ => throw new ArgumentException("invalid key", nameof(key)), + }; + + public static IEnumerable GetSingletons() => Enum.GetValues(typeof(TrackedObjectType)).Cast().Where(t => IsSingletonType(t)).Select(t => new TrackedObjectKey() { ObjectType = t }); - public override string ToString() + public override string ToString() => this.InstanceId == null ? this.ObjectType.ToString() : $"{this.ObjectType}-{this.InstanceId}"; - - public void Deserialize(BinaryReader reader) - { - this.ObjectType = (TrackedObjectType) reader.ReadByte(); - if (!IsSingletonType(this.ObjectType)) - { - this.InstanceId = reader.ReadString(); - } - } - - public void Serialize(BinaryWriter writer) - { - writer.Write((byte) this.ObjectType); - if (!IsSingletonType(this.ObjectType)) - { - writer.Write(this.InstanceId); - } - } } } diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 9a14927b..7cc7c789 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -26,7 +26,7 @@ 1 4 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 @@ -56,7 +56,7 @@ - + diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 8f13bb5f..687a7670 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -115,9 +115,10 @@ public class NetheriteOrchestrationServiceSettings /// /// Whether to checkpoint the current state of a partition when it is stopped. This improves recovery time but - /// lengthens shutdown time. + /// lengthens shutdown time and can cause memory pressure if many partitions are stopped at the same time, + /// for example if a host is shutting down. /// - public bool TakeStateCheckpointWhenStoppingPartition { get; set; } = true; + public bool TakeStateCheckpointWhenStoppingPartition { get; set; } = false; /// /// A limit on how many bytes to append to the log before initiating a state checkpoint. The default is 20MB. @@ -168,6 +169,11 @@ public class NetheriteOrchestrationServiceSettings /// public int PackPartitionTaskMessages { get; set; } = 100; + /// + /// Time limit for partition startup, in minutes. + /// + public int PartitionStartupTimeoutMinutes { get; set; } = 15; + /// /// Allows attaching additional checkers and debuggers during testing. /// diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index 9b6e3406..7f641cce 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -55,7 +55,10 @@ partial class Partition : TransportAbstraction.IPartition // A little helper property that allows us to conventiently check the condition for low-level event tracing public EventTraceHelper EventDetailTracer => this.EventTraceHelper.IsTracingAtMostDetailedLevel ? this.EventTraceHelper : null; - static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(5); + // We use this semaphore to limit how many partitions can be starting up at the same time on the same host + // because starting up a partition may temporarily consume a lot of CPU, I/O, and memory + const int ConcurrentStartsLimit = 5; + static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(ConcurrentStartsLimit); public Partition( NetheriteOrchestrationService host, @@ -93,9 +96,6 @@ public Partition( EventTraceContext.Clear(); this.ErrorHandler = errorHandler; - - this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, ""); - errorHandler.Token.Register(() => { this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, ""); @@ -108,26 +108,36 @@ public Partition( } }, useSynchronizationContext: false); - + + // before we start the partition, we have to acquire the MaxConcurrentStarts semaphore + // (to prevent a host from being overwhelmed by the simultaneous start of too many partitions) + this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, $"max={ConcurrentStartsLimit} available={MaxConcurrentStarts.CurrentCount}"); await MaxConcurrentStarts.WaitAsync(); - // create or restore partition state from last snapshot try { - // create the state - this.State = ((TransportAbstraction.IHost) this.host).StorageLayer.CreatePartitionState(parameters); + this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, ""); - // initialize timer for this partition - this.PendingTimers = new BatchTimer(this.ErrorHandler.Token, this.TimersFired); + (long, int) inputQueuePosition; - // goes to storage to create or restore the partition state - var inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false); + await using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes))) + { + // create or restore partition state from last snapshot + // create the state + this.State = ((TransportAbstraction.IHost)this.host).StorageLayer.CreatePartitionState(parameters); - // start processing the timers - this.PendingTimers.Start($"Timer{this.PartitionId:D2}"); + // initialize timer for this partition + this.PendingTimers = new BatchTimer(this.ErrorHandler.Token, this.TimersFired); - // start processing the worker queues - this.State.StartProcessing(); + // goes to storage to create or restore the partition state + inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false); + + // start processing the timers + this.PendingTimers.Start($"Timer{this.PartitionId:D2}"); + + // start processing the worker queues + this.State.StartProcessing(); + } this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition.Item1}.{inputQueuePosition.Item2}"); return inputQueuePosition; diff --git a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs index 7d5a74de..e4cf00db 100644 --- a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs +++ b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs @@ -4,6 +4,7 @@ namespace DurableTask.Netherite { using System; + using System.Collections.Generic; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -22,8 +23,7 @@ class PartitionErrorHandler : IPartitionErrorHandler readonly string taskHub; readonly TaskCompletionSource shutdownComplete; readonly TransportAbstraction.IHost host; - - public event Action OnShutdown; + readonly List disposeTasks; public CancellationToken Token { @@ -44,6 +44,11 @@ public CancellationToken Token public bool NormalTermination => this.terminationStatus == TerminatedNormally; + public bool WaitForDisposeTasks(TimeSpan timeout) + { + return Task.WhenAll(this.disposeTasks).Wait(timeout); + } + volatile int terminationStatus = NotTerminated; const int NotTerminated = 0; const int TerminatedWithError = 1; @@ -59,6 +64,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL this.taskHub = taskHubName; this.shutdownComplete = new TaskCompletionSource(); this.host = host; + this.disposeTasks = new List(); } public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning) @@ -119,9 +125,8 @@ void Terminate() { try { - this.logger?.LogDebug("Part{partition:D2} Started PartitionCancellation", this.partitionId); + // this immediately cancels all activities depending on the error handler token this.cts.Cancel(); - this.logger?.LogDebug("Part{partition:D2} Completed PartitionCancellation", this.partitionId); } catch (AggregateException aggregate) { @@ -134,47 +139,45 @@ void Terminate() { this.HandleError("PartitionErrorHandler.Terminate", "Exception in PartitionCancellation", e, false, true); } + finally + { + // now that the partition is dead, run all the dispose tasks + Task.Run(this.DisposeAsync); + } + } - // we use a dedicated shutdown thread to help debugging and to contain damage if there are hangs - Thread shutdownThread = TrackedThreads.MakeTrackedThread(Shutdown, "PartitionShutdown"); - shutdownThread.Start(); - - void Shutdown() + public void AddDisposeTask(string name, TimeSpan timeout, Action action) + { + this.disposeTasks.Add(new Task(() => { + Task disposeTask = Task.Run(action); try { - this.logger?.LogDebug("Part{partition:D2} Started PartitionShutdown", this.partitionId); - - if (this.OnShutdown != null) + bool completedInTime = disposeTask.Wait(timeout); + if (!completedInTime) { - this.OnShutdown(); - } - - this.cts.Dispose(); - - this.logger?.LogDebug("Part{partition:D2} Completed PartitionShutdown", this.partitionId); - } - catch (AggregateException aggregate) - { - foreach (var e in aggregate.InnerExceptions) - { - this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true); + this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} timed out after {timeout}", null, false, false); } } - catch (Exception e) + catch(Exception exception) { - this.HandleError("PartitionErrorHandler.Shutdown", "Exception in PartitionShutdown", e, false, true); + this.HandleError("PartitionErrorHandler.DisposeAsync", $"Dispose Task {name} threw exception {exception}", null, false, false); } - - this.shutdownComplete.TrySetResult(null); - } + })); } - public async Task WaitForTermination(TimeSpan timeout) + async Task DisposeAsync() { - Task timeoutTask = Task.Delay(timeout); - var first = await Task.WhenAny(timeoutTask, this.shutdownComplete.Task); - return first == this.shutdownComplete.Task; + // execute all the dispose tasks in parallel + var tasks = this.disposeTasks; + foreach (var task in tasks) + { + task.Start(); + } + await Task.WhenAll(tasks); + + // we can now dispose the cancellation token source itself + this.cts.Dispose(); } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs index b963a914..0dbe9c6c 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs @@ -295,7 +295,7 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs this.underLease, "BlobBaseClient.DeleteAsync", "DeleteDeviceSegment", - "", + $"id={id}", entry.PageBlob.Default.Name, 5000, true, @@ -448,13 +448,14 @@ async Task WritePortionToBlobAsync(UnmanagedMemoryStream stream, BlobEntry blobE { using (stream) { + var position = destinationAddress + offset; long originalStreamPosition = stream.Position; await this.BlobManager.PerformWithRetriesAsync( BlobManager.AsynchronousStorageWriteMaxConcurrency, true, "PageBlobClient.UploadPagesAsync", "WriteToDevice", - $"id={id} length={length} destinationAddress={destinationAddress + offset}", + $"id={id} position={position} length={length}", blobEntry.PageBlob.Default.Name, 1000 + (int)length / 1000, true, @@ -498,11 +499,15 @@ unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sour async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id) { + long readRangeStart = sourceAddress; + long readRangeEnd = readRangeStart + readLength; + string operationReadRange = $"[{readRangeStart}, {readRangeEnd}]"; using (stream) { long offset = 0; while (readLength > 0) { + var position = sourceAddress + offset; var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE); await this.BlobManager.PerformWithRetriesAsync( @@ -510,7 +515,7 @@ await this.BlobManager.PerformWithRetriesAsync( true, "PageBlobClient.DownloadStreamingAsync", "ReadFromDevice", - $"id={id} readLength={length} sourceAddress={sourceAddress + offset}", + $"id={id} position={position} length={length} operationReadRange={operationReadRange}", blob.Default.Name, 1000 + (int)length / 1000, true, diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index e26089e3..453c4911 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -677,6 +677,8 @@ public async Task RenewLeaseTask() public async Task MaintenanceLoopAsync() { + bool releaseLeaseAtEnd = !this.UseLocalFiles; + this.TraceHelper.LeaseProgress("Started lease maintenance loop"); try { @@ -711,6 +713,7 @@ public async Task MaintenanceLoopAsync() { // We lost the lease to someone else. Terminate ownership immediately. this.PartitionErrorHandler.HandleError(nameof(MaintenanceLoopAsync), "Lost partition lease", ex, true, true); + releaseLeaseAtEnd = false; } catch (Exception e) { @@ -729,24 +732,22 @@ public async Task MaintenanceLoopAsync() this.TraceHelper.LeaseProgress("Waited for lease users to complete"); // release the lease - if (!this.UseLocalFiles) + if (releaseLeaseAtEnd) { try { this.TraceHelper.LeaseProgress("Releasing lease"); this.FaultInjector?.StorageAccess(this, "ReleaseLeaseAsync", "ReleaseLease", this.eventLogCommitBlob.Name); - await this.leaseClient.ReleaseAsync(null, this.PartitionErrorHandler.Token).ConfigureAwait(false); + + // we must always release the lease here, whether the partition has already been terminated or not. + // otherwise, the recovery of this partition (on this host or another host) has to wait for up + // to 40s for the lease to expire. During this time, the partition is stalled (cannot process any work + // items or client requests). In particular, client requests targeting this partition may be heavily delayed. + await this.leaseClient.ReleaseAsync(conditions: null, CancellationToken.None).ConfigureAwait(false); + this.TraceHelper.LeaseReleased(this.leaseTimer.Elapsed.TotalSeconds); } - catch (OperationCanceledException) - { - // it's o.k. if termination is triggered while waiting - } - catch (Azure.RequestFailedException e) when (e.InnerException != null && e.InnerException is OperationCanceledException) - { - // it's o.k. if termination is triggered while we are releasing the lease - } catch (Exception e) { // we swallow, but still report exceptions when releasing a lease @@ -754,9 +755,11 @@ public async Task MaintenanceLoopAsync() } } + this.TraceHelper.FasterProgress("Blob manager is terminating partition normally"); + this.PartitionErrorHandler.TerminateNormally(); - this.TraceHelper.LeaseProgress("Blob manager stopped"); + this.TraceHelper.FasterProgress("Blob manager stopped"); } public async Task RemoveObsoleteCheckpoints() @@ -1115,23 +1118,23 @@ void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMeta var metaFileBlob = partDir.GetBlockBlobClient(this.GetIndexCheckpointMetaBlobName(indexToken)); this.PerformWithRetries( - false, - "BlockBlobClient.OpenWrite", - "WriteIndexCheckpointMetadata", - $"token={indexToken} size={commitMetadata.Length}", - metaFileBlob.Name, - 1000, - true, - (numAttempts) => - { - var client = metaFileBlob.WithRetries; - using var blobStream = client.OpenWrite(overwrite: true); - using var writer = new BinaryWriter(blobStream); - writer.Write(commitMetadata.Length); - writer.Write(commitMetadata); - writer.Flush(); - return (commitMetadata.Length, true); - }); + false, + "BlockBlobClient.OpenWrite", + "WriteIndexCheckpointMetadata", + $"token={indexToken} size={commitMetadata.Length}", + metaFileBlob.Name, + 1000, + true, + (numAttempts) => + { + var client = metaFileBlob.WithRetries; + using var blobStream = client.OpenWrite(overwrite: true); + using var writer = new BinaryWriter(blobStream); + writer.Write(commitMetadata.Length); + writer.Write(commitMetadata); + writer.Flush(); + return (commitMetadata.Length, true); + }); this.CheckpointInfo.IndexToken = indexToken; this.StorageTracer?.FasterStorageProgress($"StorageOpReturned ICheckpointManager.CommitIndexCheckpoint, target={metaFileBlob.Name}"); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs index 952aec51..a07acbb1 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/StorageOperations.cs @@ -21,7 +21,7 @@ public async Task PerformWithRetriesAsync( bool requireLease, string name, string intent, - string data, + string details, string target, int expectedLatencyBound, bool isCritical, @@ -59,7 +59,7 @@ public async Task PerformWithRetriesAsync( this.PartitionErrorHandler.Token.ThrowIfCancellationRequested(); - this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {data}"); + this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {details}"); stopwatch.Restart(); @@ -68,14 +68,14 @@ public async Task PerformWithRetriesAsync( long size = await operationAsync(numAttempts).ConfigureAwait(false); stopwatch.Stop(); - this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {data}"); + this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {details}"); if (stopwatch.ElapsedMilliseconds > expectedLatencyBound) { - this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {data}"); + this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {details}"); } - this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts); + this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, details, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts); return; } @@ -91,7 +91,7 @@ public async Task PerformWithRetriesAsync( if (BlobUtils.IsTimeout(e)) { - this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}"); + this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}"); } else { @@ -103,7 +103,7 @@ public async Task PerformWithRetriesAsync( } catch (Azure.RequestFailedException ex) when (BlobUtilsV12.PreconditionFailed(ex) && readETagAsync != null) { - this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) failed precondition on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {data}"); + this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) failed precondition on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {details}"); mustReadETagFirst = true; continue; } @@ -134,7 +134,7 @@ public void PerformWithRetries( bool requireLease, string name, string intent, - string data, + string details, string target, int expectedLatencyBound, bool isCritical, @@ -156,7 +156,7 @@ public void PerformWithRetries( this.PartitionErrorHandler.Token.ThrowIfCancellationRequested(); - this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {data}"); + this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {details}"); stopwatch.Restart(); this.FaultInjector?.StorageAccess(this, name, intent, target); @@ -169,13 +169,13 @@ public void PerformWithRetries( } stopwatch.Stop(); - this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} size={size} {data} "); + this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} size={size} {details} "); - this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts); + this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, details, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts); if (stopwatch.ElapsedMilliseconds > expectedLatencyBound) { - this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {data}"); + this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {details}"); } return; @@ -191,7 +191,7 @@ public void PerformWithRetries( stopwatch.Stop(); if (BlobUtils.IsTimeout(e)) { - this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}"); + this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}"); } else { diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 00d26c87..135dd4f4 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -10,6 +10,7 @@ namespace DurableTask.Netherite.Faster using System.IO; using System.Linq; using System.Runtime.CompilerServices; + using System.Runtime.Serialization; using System.Text; using System.Threading; using System.Threading.Channels; @@ -97,8 +98,8 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo blobManager.StoreCheckpointSettings, new SerializerSettings { - keySerializer = () => new Key.Serializer(), - valueSerializer = () => new Value.Serializer(this.StoreStats, partition.TraceHelper, this.cacheDebugger), + keySerializer = () => new Key.Serializer(partition.ErrorHandler), + valueSerializer = () => new Value.Serializer(this.StoreStats, partition.TraceHelper, this.cacheDebugger, partition.ErrorHandler), }); this.cacheTracker = memoryTracker.NewCacheTracker(this, (int) partition.PartitionId, this.cacheDebugger); @@ -112,7 +113,7 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo partition.Assert(this.fht.ReadCache == null, "Unexpected read cache"); this.terminationToken = partition.ErrorHandler.Token; - partition.ErrorHandler.OnShutdown += this.Shutdown; + partition.ErrorHandler.AddDisposeTask($"{nameof(FasterKV)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose); this.compactionStopwatch = new Stopwatch(); this.compactionStopwatch.Start(); @@ -122,58 +123,53 @@ public FasterKV(Partition partition, BlobManager blobManager, MemoryTracker memo this.blobManager.TraceHelper.FasterProgress("Constructed FasterKV"); } - void Shutdown() + void Dispose() { - try + this.TraceHelper.FasterProgress("Disposing CacheTracker"); + this.cacheTracker?.Dispose(); + + foreach (var s in this.sessionsToDisposeOnShutdown) { - this.TraceHelper.FasterProgress("Disposing CacheTracker"); - this.cacheTracker?.Dispose(); + this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session"); + s.Dispose(); + } - foreach (var s in this.sessionsToDisposeOnShutdown) - { - this.TraceHelper.FasterStorageProgress($"Disposing Temporary Session"); - s.Dispose(); - } + this.TraceHelper.FasterProgress("Disposing Main Session"); + try + { + this.mainSession?.Dispose(); + } + catch (OperationCanceledException) + { + // can happen during shutdown + } - this.TraceHelper.FasterProgress("Disposing Main Session"); + this.TraceHelper.FasterProgress("Disposing Query Sessions"); + foreach (var s in this.querySessions) + { try { - this.mainSession?.Dispose(); + s?.Dispose(); } - catch(OperationCanceledException) + catch (OperationCanceledException) { // can happen during shutdown } + } - this.TraceHelper.FasterProgress("Disposing Query Sessions"); - foreach (var s in this.querySessions) - { - try - { - s?.Dispose(); - } - catch (OperationCanceledException) - { - // can happen during shutdown - } - } - - this.TraceHelper.FasterProgress("Disposing FasterKV"); - this.fht.Dispose(); + this.TraceHelper.FasterProgress("Disposing FasterKV"); + this.fht.Dispose(); - this.TraceHelper.FasterProgress($"Disposing Devices"); - this.blobManager.DisposeDevices(); + this.TraceHelper.FasterProgress($"Disposing Devices"); + this.blobManager.DisposeDevices(); - if (this.blobManager.FaultInjector != null) - { - this.TraceHelper.FasterProgress($"Unregistering from FaultInjector"); - this.blobManager.FaultInjector.Disposed(this.blobManager); - } - } - catch (Exception e) + if (this.blobManager.FaultInjector != null) { - this.blobManager.TraceHelper.FasterStorageError("Disposing FasterKV", e); + this.TraceHelper.FasterProgress($"Unregistering from FaultInjector"); + this.blobManager.FaultInjector.Disposed(this.blobManager); } + + this.TraceHelper.FasterProgress("Disposed FasterKV"); } double GetElapsedCompactionMilliseconds() @@ -255,8 +251,9 @@ public override Task FindCheckpointAsync(bool logIsEmpty) } // recover Faster - this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV"); + this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV - Entering fht.RecoverAsync"); await this.fht.RecoverAsync(this.partition.Settings.FasterTuningParameters?.NumPagesToPreload ?? 1, true, -1, this.terminationToken); + this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV - Returned from fht.RecoverAsync"); this.mainSession = this.CreateASession($"main-{this.blobManager.IncarnationTimestamp:o}", false); for (int i = 0; i < this.querySessions.Length; i++) { @@ -1594,13 +1591,51 @@ public bool Equals(ref Key k1, ref Key k2) public class Serializer : BinaryObjectSerializer { + readonly IPartitionErrorHandler errorHandler; + + public Serializer(IPartitionErrorHandler errorHandler) + { + this.errorHandler = errorHandler; + } + public override void Deserialize(out Key obj) { - obj = new Key(); - obj.Val.Deserialize(this.reader); + try + { + if (!this.errorHandler.IsTerminated) // skip deserialization if the partition is already terminated - to speed up cancellation and to avoid repeated errors + { + // first, determine the object type + var objectType = (TrackedObjectKey.TrackedObjectType)this.reader.ReadByte(); + if (objectType != TrackedObjectKey.TrackedObjectType.History + && objectType != TrackedObjectKey.TrackedObjectType.Instance) + { + throw new SerializationException("invalid object type field"); + } + var instanceId = this.reader.ReadString(); + obj = new TrackedObjectKey(objectType, instanceId); + return; + } + } + catch (Exception ex) + { + this.errorHandler.HandleError("FasterKV.Key.Serializer", "could not deserialize key - possible data corruption", ex, true, !this.errorHandler.IsTerminated); + } + + obj = default; } - public override void Serialize(ref Key obj) => obj.Val.Serialize(this.writer); + public override void Serialize(ref Key obj) + { + try + { + this.writer.Write((byte)obj.Val.ObjectType); + this.writer.Write(obj.Val.InstanceId); + } + catch (Exception ex) + { + this.errorHandler.HandleError("FasterKV.Key.Serializer", "could not serialize key", ex, true, false); + } + } } } @@ -1624,49 +1659,77 @@ public class Serializer : BinaryObjectSerializer readonly StoreStatistics storeStats; readonly PartitionTraceHelper traceHelper; readonly CacheDebugger cacheDebugger; + readonly IPartitionErrorHandler errorHandler; - public Serializer(StoreStatistics storeStats, PartitionTraceHelper traceHelper, CacheDebugger cacheDebugger) + public Serializer(StoreStatistics storeStats, PartitionTraceHelper traceHelper, CacheDebugger cacheDebugger, IPartitionErrorHandler errorHandler) { this.storeStats = storeStats; this.traceHelper = traceHelper; this.cacheDebugger = cacheDebugger; + this.errorHandler = errorHandler; } public override void Deserialize(out Value obj) { - int version = this.reader.ReadInt32(); - int count = this.reader.ReadInt32(); - byte[] bytes = this.reader.ReadBytes(count); // lazy deserialization - keep as byte array until used - obj = new Value { Val = bytes, Version = version}; - if (this.cacheDebugger != null) + try + { + if (!this.errorHandler.IsTerminated) // skip deserialization if the partition is already terminated - to speed up cancellation and to avoid repeated errors + { + int version = this.reader.ReadInt32(); + int count = this.reader.ReadInt32(); + byte[] bytes = this.reader.ReadBytes(count); // lazy deserialization - keep as byte array until used + + if (bytes.Length != count) + { + throw new EndOfStreamException($"trying to read {count} bytes but only found {bytes.Length}"); + } + + obj = new Value { Val = bytes, Version = version}; + if (this.cacheDebugger != null) + { + var trackedObject = DurableTask.Netherite.Serializer.DeserializeTrackedObject(bytes); + this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.DeserializeBytes, version, null, 0); + } + + return; + } + } + catch (Exception ex) { - var trackedObject = DurableTask.Netherite.Serializer.DeserializeTrackedObject(bytes); - this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.DeserializeBytes, version, null, 0); + this.errorHandler.HandleError("FasterKV.Value.Serializer", "could not deserialize value - possible data corruption", ex, true, !this.errorHandler.IsTerminated); } + obj = default; } public override void Serialize(ref Value obj) { - this.writer.Write(obj.Version); - if (obj.Val is byte[] serialized) + try { - // We did already serialize this object on the last CopyUpdate. So we can just use the byte array. - this.writer.Write(serialized.Length); - this.writer.Write(serialized); - if (this.cacheDebugger != null) + this.writer.Write(obj.Version); + if (obj.Val is byte[] serialized) + { + // We did already serialize this object on the last CopyUpdate. So we can just use the byte array. + this.writer.Write(serialized.Length); + this.writer.Write(serialized); + if (this.cacheDebugger != null) + { + var trackedObject = DurableTask.Netherite.Serializer.DeserializeTrackedObject(serialized); + this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.SerializeBytes, obj.Version, null, 0); + } + } + else { - var trackedObject = DurableTask.Netherite.Serializer.DeserializeTrackedObject(serialized); - this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.SerializeBytes, obj.Version, null, 0); + TrackedObject trackedObject = (TrackedObject) obj.Val; + var bytes = DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject); + this.storeStats.Serialize++; + this.writer.Write(bytes.Length); + this.writer.Write(bytes); + this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.SerializeObject, obj.Version, null, 0); } } - else + catch (Exception ex) { - TrackedObject trackedObject = (TrackedObject) obj.Val; - var bytes = DurableTask.Netherite.Serializer.SerializeTrackedObject(trackedObject); - this.storeStats.Serialize++; - this.writer.Write(bytes.Length); - this.writer.Write(bytes); - this.cacheDebugger?.Record(trackedObject.Key, CacheDebugger.CacheEvent.SerializeObject, obj.Version, null, 0); + this.errorHandler.HandleError("FasterKV.Value.Serializer", "could not serialize value", ex, true, false); } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs index 20949733..cad4e5a2 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterLog.cs @@ -20,24 +20,19 @@ public FasterLog(BlobManager blobManager, NetheriteOrchestrationServiceSettings this.blobManager = blobManager; var eventlogsettings = blobManager.GetEventLogSettings(settings.UseSeparatePageBlobStorage, settings.FasterTuningParameters); this.log = new FASTER.core.FasterLog(eventlogsettings); - blobManager.PartitionErrorHandler.OnShutdown += this.Shutdown; + blobManager.PartitionErrorHandler.AddDisposeTask($"{nameof(FasterLog)}.{nameof(this.Dispose)}", TimeSpan.FromMinutes(2), this.Dispose); this.terminationToken = blobManager.PartitionErrorHandler.Token; } - - void Shutdown() + + void Dispose() { - try - { - this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog"); - this.log.Dispose(); + this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog"); + this.log.Dispose(); - this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device"); - this.blobManager.EventLogDevice.Dispose(); - } - catch (Exception e) - { - this.blobManager.TraceHelper.FasterStorageError("Disposing FasterLog", e); - } + this.blobManager.TraceHelper.FasterProgress("Disposing FasterLog Device"); + this.blobManager.EventLogDevice.Dispose(); + + this.blobManager.TraceHelper.FasterProgress("Disposed FasterLog"); } public long BeginAddress => this.log.BeginAddress; diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs index 63d0bb86..e42650f9 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs @@ -146,13 +146,13 @@ public void FasterStorageProgress(string details) } } - public void FasterAzureStorageAccessCompleted(string intent, long size, string operation, string target, double latency, int attempt) + public void FasterAzureStorageAccessCompleted(string intent, long size, string operation, string details, string target, double latency, int attempt) { if (this.logLevelLimit <= LogLevel.Debug) { - this.logger.LogDebug("Part{partition:D2} storage access completed intent={intent} size={size} operation={operation} target={target} latency={latency} attempt={attempt}", - this.partitionId, intent, size, operation, target, latency, attempt); - EtwSource.Log.FasterAzureStorageAccessCompleted(this.account, this.taskHub, this.partitionId, intent, size, operation, target, latency, attempt, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.logger.LogDebug("Part{partition:D2} storage access completed intent={intent} size={size} operation={operation} {details} target={target} latency={latency} attempt={attempt}", + this.partitionId, intent, size, operation, details, target, latency, attempt); + EtwSource.Log.FasterAzureStorageAccessCompleted(this.account, this.taskHub, this.partitionId, intent, size, operation, details, target, latency, attempt, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs index 18dddf85..f94e465a 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs @@ -115,7 +115,7 @@ async Task TerminationWrapper(Task what) if (this.partition.Settings.TestHooks?.ReplayChecker == null) { this.hangCheckTimer = new Timer(this.CheckForStuckWorkers, null, 0, 20000); - errorHandler.OnShutdown += () => this.hangCheckTimer.Dispose(); + errorHandler.AddDisposeTask("DisposeHangTimer", TimeSpan.FromSeconds(10), () => this.hangCheckTimer.Dispose()); } bool hasCheckpoint = false; diff --git a/src/DurableTask.Netherite/Tracing/EtwSource.cs b/src/DurableTask.Netherite/Tracing/EtwSource.cs index 9b90bebb..2d5cf6d9 100644 --- a/src/DurableTask.Netherite/Tracing/EtwSource.cs +++ b/src/DurableTask.Netherite/Tracing/EtwSource.cs @@ -378,11 +378,11 @@ public void FasterStorageProgress(string Account, string TaskHub, int PartitionI this.WriteEvent(265, Account, TaskHub, PartitionId, Details, AppName, ExtensionVersion); } - [Event(266, Level = EventLevel.Verbose, Version = 1)] - public void FasterAzureStorageAccessCompleted(string Account, string TaskHub, int PartitionId, string Intent, long Size, string Operation, string Target, double Latency, int Attempt, string AppName, string ExtensionVersion) + [Event(266, Level = EventLevel.Verbose, Version = 3)] + public void FasterAzureStorageAccessCompleted(string Account, string TaskHub, int PartitionId, string Intent,long Size, string Operation, string Details, string Target, double Latency, int Attempt, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(266, Account, TaskHub, PartitionId, Intent, Size, Operation, Target, Latency, Attempt, AppName, ExtensionVersion); + this.WriteEvent(266, Account, TaskHub, PartitionId, Intent, Size, Operation, Details, Target, Latency, Attempt, AppName, ExtensionVersion); } [Event(267, Level = EventLevel.Warning, Version = 1)] diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs index 78acf837..6c451995 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs @@ -180,18 +180,13 @@ async Task StartPartitionAsync(PartitionIncarnation prior await Task.Delay(addedDelay); } - // we wait at most 20 seconds for the previous partition to terminate cleanly - int tries = 4; - var timeout = TimeSpan.FromSeconds(5); + // the previous incarnation has already been terminated. But it may not have cleaned up yet. + // We wait (but no more than 20 seconds) for the previous partition to dispose its assets + bool disposalComplete = prior.ErrorHandler.WaitForDisposeTasks(TimeSpan.FromSeconds(20)); - while (!await prior.ErrorHandler.WaitForTermination(timeout)) + if (!disposalComplete) { - this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) is still waiting for PartitionShutdown of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation); - - if (--tries == 0) - { - break; - } + this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} partition (incarnation {incarnation}) timed out waiting for disposal of previous incarnation", this.eventHubName, this.eventHubPartition, c.Incarnation); } } } @@ -437,7 +432,7 @@ async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumera // iterators do not support ref arguments, so we use a simple wrapper class to work around this limitation MutableLong nextPacketToReceive = new MutableLong() { Value = current.NextPacketToReceive.seqNo }; - await foreach ((EventData eventData, PartitionEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, current.ErrorHandler.Token, nextPacketToReceive)) + await foreach ((EventData eventData, PartitionEvent[] events, long seqNo) in this.blobBatchReceiver.ReceiveEventsAsync(this.taskHubGuid, packets, this.shutdownToken, nextPacketToReceive)) { for (int i = 0; i < events.Length; i++) { @@ -489,7 +484,7 @@ async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumera await this.SaveEventHubsReceiverCheckpoint(context, 600000); } - catch (OperationCanceledException) + catch (OperationCanceledException) when (this.shutdownToken.IsCancellationRequested) // we should only ignore these exceptions during VM shutdowns. See : https://github.com/microsoft/durabletask-netherite/pull/347 { this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition}({incarnation}) was terminated", this.eventHubName, this.eventHubPartition, current.Incarnation); } diff --git a/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs b/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs index 22eb20a2..e8ed541e 100644 --- a/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs +++ b/src/DurableTask.Netherite/TransportLayer/SingleHost/PartitionQueue.cs @@ -78,14 +78,14 @@ protected override async Task Process(IList batch) { this.Partition = this.host.AddPartition(this.partitionId, this.sender); var errorHandler = this.host.CreateErrorHandler(this.partitionId); - errorHandler.OnShutdown += () => + errorHandler.AddDisposeTask("PartitionQueue.Termination", TimeSpan.FromSeconds(10), () => { if (!this.isShuttingDown && this.testHooks?.FaultInjectionActive != true) { this.testHooks?.Error("MemoryTransport", "Unexpected partition termination"); } this.Notify(); - }; + }); var (nextInputQueuePosition, _) = await this.Partition.CreateOrRestoreAsync(errorHandler, this.parameters, this.fingerPrint); diff --git a/src/DurableTask.Netherite/Util/BlobUtils.cs b/src/DurableTask.Netherite/Util/BlobUtils.cs index 6cc94e72..d181d97e 100644 --- a/src/DurableTask.Netherite/Util/BlobUtils.cs +++ b/src/DurableTask.Netherite/Util/BlobUtils.cs @@ -100,6 +100,12 @@ public static bool IsTransientStorageError(Exception exception) return true; } + // Empirically observed: transient DNS failures + if (exception is Azure.RequestFailedException && exception.InnerException is System.Net.Http.HttpRequestException e2 && e2.Message.Contains("No such host is known")) + { + return true; + } + return false; } diff --git a/src/DurableTask.Netherite/Util/PartitionTimeout.cs b/src/DurableTask.Netherite/Util/PartitionTimeout.cs new file mode 100644 index 00000000..e4c32b6b --- /dev/null +++ b/src/DurableTask.Netherite/Util/PartitionTimeout.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite +{ + using System; + using System.Collections.Generic; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + + /// + /// A utility class for terminating the partition if some task takes too long. + /// Implemented as a disposable, with used to cancel the timeout. + /// + class PartitionTimeout : IAsyncDisposable + { + readonly CancellationTokenSource tokenSource; + readonly Task timeoutTask; + + public PartitionTimeout(IPartitionErrorHandler errorHandler, string task, TimeSpan timeout) + { + this.tokenSource = new CancellationTokenSource(); + this.timeoutTask = Task.Run(async () => + { + try + { + await Task.Delay(timeout, this.tokenSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // we did not time out + return; + } + + errorHandler.HandleError( + $"{nameof(PartitionTimeout)}", + $"{task} timed out after {timeout}", + e: null, + terminatePartition: true, + reportAsWarning: false); + }); + } + + public async ValueTask DisposeAsync() + { + // cancel the timeout task (if it has not already completed) + this.tokenSource.Cancel(); + + // wait for the timeouttask to complete here, so we can be sure that the + // decision about the timeout firing or not firing has been made + // before we leave this method + await this.timeoutTask.ConfigureAwait(false); + + // we can dispose the token source now since the timeoutTask is completed + this.tokenSource.Dispose(); + } + } +} diff --git a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj index 01c5f1e4..99494417 100644 --- a/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj +++ b/src/Functions.Worker.Extensions.DurableTask.Netherite/Functions.Worker.Extensions.DurableTask.Netherite.csproj @@ -27,7 +27,7 @@ 1 4 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(MajorVersion).0.0.0 diff --git a/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs b/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs index 2c6f5f7d..0e88a41a 100644 --- a/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs +++ b/src/Functions.Worker.Extensions.DurableTask.Netherite/Properties/AssemblyInfo.cs @@ -4,4 +4,4 @@ using Microsoft.Azure.Functions.Worker.Extensions.Abstractions; // This must be updated when updating the version of the package -[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.4.1", true)] \ No newline at end of file +[assembly: ExtensionInformation("Microsoft.Azure.DurableTask.Netherite.AzureFunctions", "1.4.2", true)] \ No newline at end of file