Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error handling for FASTER IO completion callbacks #349

Merged
merged 7 commits into from
Mar 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,33 @@ void CancelAllRequests()
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Canceled)");
}
request.Callback(uint.MaxValue, request.NumBytes, request.Context);

try
{
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}");
}
catch (Exception ex)
{
this.BlobManager.StorageTracer?.FasterStorageError($"{nameof(CancelAllRequests)} for access id={id} failed during FASTER completion callback", ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to re-throw here? Aren't we swallowing the exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no one to meaningfully catch exceptions here. And the partition has already been terminated (that's why the requests are being cancelled). So it is just about tracing at this point.

}
}
}
foreach (var id in this.pendingRemoveOperations.Keys.ToList())
{
if (this.pendingRemoveOperations.TryRemove(id, out var request))
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.RemoveSegmentAsync id={id} (Canceled)");
request.Callback(request.Result);

try
{
request.Callback(request.Result);
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}");
}
catch (Exception ex)
{
this.BlobManager.StorageTracer?.FasterStorageError($"{nameof(CancelAllRequests)} for access id={id} failed during FASTER completion callback", ex);
}
}
}
}
Expand Down Expand Up @@ -380,23 +398,9 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
throw exception;
}

this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id)
.ContinueWith((Task t) =>
{
if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
if (t.IsFaulted)
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}");
request.Callback(0, request.NumBytes, request.Context);
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
// we are not awaiting this task because it uses FASTER's callback mechanism
// when the access is completed.
Comment on lines -387 to +402
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't immediately see why it's safe to remove this logic. Mind elaborating? Perhaps it's related to your comment that this task isn't awaited, but I did not fully get that either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic was not removed, just placed somewhere else. These callbacks now happen right after the access completes. It should be functionally mostly the same but I prefer to not use the tricky task stuff and instead just write out the straight line code when possible.

this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id);
}

/// <summary>
Expand Down Expand Up @@ -434,7 +438,7 @@ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong desti
// Otherwise, some other thread beat us to it. Okay to use their blobs.
blobEntry = this.blobs[segmentId];
}
this.TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
this.TryWriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
}

//---- The actual read and write accesses to the page blobs
Expand Down Expand Up @@ -492,17 +496,18 @@ await this.BlobManager.PerformWithRetriesAsync(
}
}

unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id)
unsafe void ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id)
{
return this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id);
Task _ = this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id);
}

async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id)
{
long readRangeStart = sourceAddress;
long readRangeEnd = readRangeStart + readLength;
string operationReadRange = $"[{readRangeStart}, {readRangeEnd}]";
using (stream)
bool IsFaulted = false;
try
{
long offset = 0;
while (readLength > 0)
Expand Down Expand Up @@ -555,62 +560,125 @@ await this.BlobManager.PerformWithRetriesAsync(
offset += length;
}
}
catch (Exception e)
{
IsFaulted = true;

// details about failed storage accesses are already traced by the error handler, but we still create another trace here
// to help us debug situations where exceptions are thrown in other places, and to clarify the execution path
this.BlobManager.StorageTracer?.FasterStorageError($"{nameof(ReadFromBlobAsync)} id={id} encountered exception", e);
}
finally
{
stream.Dispose();
}
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved

if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
try
{
if (IsFaulted)
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}");
request.Callback(0, request.NumBytes, request.Context);
}
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}");
}
catch (Exception e)
{
this.BlobManager.StorageTracer?.FasterStorageError($"{nameof(ReadFromBlobAsync)} id={id} failed during FASTER completion callback", e);
}
}

// this task is not awaited, so it must never throw exceptions.
}

void TryWriteAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)

void TryWriteToBlob(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)
{
// If pageBlob is null, it is being created. Attempt to queue the write for the creator to complete after it is done
if (blobEntry.PageBlob.Default == null
&& blobEntry.TryQueueAction(() => this.WriteToBlobAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id)))
&& blobEntry.TryQueueAction(() => this.WriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id)))
{
return;
}
// Otherwise, invoke directly.
this.WriteToBlobAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
this.WriteToBlob(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, id);
}

unsafe void WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)
unsafe void WriteToBlob(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, long id)
{
this.WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id)
.ContinueWith((Task t) =>
{
if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
if (t.IsFaulted)
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}");
request.Callback(0, request.NumBytes, request.Context);
}
}

if (this.underLease)
{
this.SingleWriterSemaphore.Release();
}

}, TaskContinuationOptions.ExecuteSynchronously);
// we are not awaiting this task because it uses FASTER's callback mechanism
// when the access is completed.
Task _ = this.WriteToBlobAsync(blobEntry, sourceAddress, (long)destinationAddress, numBytesToWrite, id);
}

async Task WriteToBlobAsync(BlobEntry blobEntry, IntPtr sourceAddress, long destinationAddress, uint numBytesToWrite, long id)
{
if (this.underLease)
{
// this semaphore is needed to avoid ambiguous e-tags under concurrent writes
await this.SingleWriterSemaphore.WaitAsync();
}

long offset = 0;
while (numBytesToWrite > 0)
bool IsFaulted = false;

try
{
var length = Math.Min(numBytesToWrite, MAX_UPLOAD_SIZE);
await this.WritePortionToBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, offset, length, id).ConfigureAwait(false);
numBytesToWrite -= length;
offset += length;
long offset = 0;
while (numBytesToWrite > 0)
{
var length = Math.Min(numBytesToWrite, MAX_UPLOAD_SIZE);
await this.WritePortionToBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, offset, length, id).ConfigureAwait(false);
numBytesToWrite -= length;
offset += length;
}
}
catch (Exception e)
{
IsFaulted = true;

// details about failed storage accesses are already traced by the error handler, but we still create another trace here
// to help us debug situations where exceptions are thrown in other places, and to clarify the execution path
this.BlobManager.StorageTracer?.FasterStorageError($"{nameof(WriteToBlobAsync)} id={id} encountered exception", e);
}
finally
{
if (this.underLease)
{
// always release this semaphore again
this.SingleWriterSemaphore.Release();
}
}

if (this.pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
try
{
if (IsFaulted)
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpReturned AzureStorageDevice.WriteAsync id={id}");
request.Callback(0, request.NumBytes, request.Context);
}
this.BlobManager?.StorageTracer?.FasterStorageProgress($"StorageOpCallbackCompleted id={id}");
}
catch (Exception e)
{
this.BlobManager.StorageTracer?.FasterStorageError($"{nameof(WriteToBlobAsync)} id={id} failed during FASTER completion callback", e);
}
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved
}

// this task is not awaited, so it must never throw exceptions.
}
}
}