diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs index a4c83d8653d5..9859216de85c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanFile.cs @@ -62,10 +62,20 @@ public static async Task CreateJobPartPlanFileAsync( FileName = fileName }; - using (FileStream fileStream = File.Create(result.FileName.ToString())) + try { - await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false); + using (FileStream fileStream = File.Create(result.FileName.ToString())) + { + await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false); + } } + catch (Exception) + { + // will handle if file has not been created yet + File.Delete(result.FileName.ToString()); + throw; + } + return result; } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs index 59312b2c0499..744bfe3ebe10 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPlanFile.cs @@ -58,9 +58,18 @@ public static async Task CreateJobPlanFileAsync( string filePath = Path.Combine(checkpointerPath, fileName); JobPlanFile jobPlanFile = new(id, filePath); - using (FileStream fileStream = File.Create(jobPlanFile.FilePath)) + try { - await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false); + using (FileStream fileStream = File.Create(jobPlanFile.FilePath)) + { + await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false); + } + } + catch (Exception) + { + // will handle if file has not been created yet + File.Delete(jobPlanFile.FilePath); + throw; } return jobPlanFile; diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs index 4d1c537bbe8e..10d0ae06e7a6 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalTransferCheckpointer.cs @@ -113,6 +113,13 @@ public override async Task AddNewJobPartAsync( CancellationHelper.ThrowIfCancellationRequested(cancellationToken); headerStream.Position = 0; + if (!_transferStates.ContainsKey(transferId)) + { + // We should never get here because AddNewJobAsync should + // always be called first. + throw Errors.MissingTransferIdAddPartCheckpointer(transferId, partNumber); + } + JobPartPlanFile mappedFile = await JobPartPlanFile.CreateJobPartPlanFileAsync( _pathToCheckpointer, transferId, @@ -121,16 +128,7 @@ public override async Task AddNewJobPartAsync( cancellationToken).ConfigureAwait(false); // Add the job part into the current state - if (_transferStates.ContainsKey(transferId)) - { - _transferStates[transferId].JobParts.Add(partNumber, mappedFile); - } - else - { - // We should never get here because AddNewJobAsync should - // always be called first. - throw Errors.MissingTransferIdAddPartCheckpointer(transferId, partNumber); - } + _transferStates[transferId].JobParts.Add(partNumber, mappedFile); } public override Task CurrentJobPartCountAsync( diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs index dbb764ed878d..bc28cc5611d8 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs @@ -179,12 +179,12 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint( public override async Task ProcessPartToChunkAsync() { - await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); - - long? fileLength = default; - StorageResourceItemProperties sourceProperties = default; try { + await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); + + long? fileLength = default; + StorageResourceItemProperties sourceProperties = default; fileLength = _sourceResource.Length; sourceProperties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false); await _destinationResource.SetPermissionsAsync( @@ -193,60 +193,58 @@ await _destinationResource.SetPermissionsAsync( _cancellationToken).ConfigureAwait(false); fileLength = sourceProperties.ResourceLength; - } - catch (Exception ex) - { - // TODO: logging when given the event handler - await InvokeFailedArg(ex).ConfigureAwait(false); - return; - } - if (!fileLength.HasValue) - { - await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false); - return; - } - long length = fileLength.Value; + if (!fileLength.HasValue) + { + await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false); + return; + } + long length = fileLength.Value; - // Perform a single copy operation - if (_initialTransferSize >= length) - { - await QueueChunkToChannelAsync( - async () => - await StartSingleCallCopy(length).ConfigureAwait(false)) - .ConfigureAwait(false); - return; - } + // Perform a single copy operation + if (_initialTransferSize >= length) + { + await QueueChunkToChannelAsync( + async () => + await StartSingleCallCopy(length).ConfigureAwait(false)) + .ConfigureAwait(false); + return; + } - // Perform a series of chunk copies followed by a commit - long blockSize = _transferChunkSize; - - _commitBlockHandler = GetCommitController( - expectedLength: length, - blockSize: blockSize, - this, - _destinationResource.TransferType, - sourceProperties); - // If we cannot upload in one shot, initiate the parallel block uploader - if (await CreateDestinationResource(length, blockSize).ConfigureAwait(false)) - { - List<(long Offset, long Length)> commitBlockList = GetRangeList(blockSize, length); - if (_destinationResource.TransferType == DataTransferOrder.Unordered) + // Perform a series of chunk copies followed by a commit + long blockSize = _transferChunkSize; + + _commitBlockHandler = GetCommitController( + expectedLength: length, + blockSize: blockSize, + this, + _destinationResource.TransferType, + sourceProperties); + // If we cannot upload in one shot, initiate the parallel block uploader + if (await CreateDestinationResource(length, blockSize).ConfigureAwait(false)) { - await QueueStageBlockRequests(commitBlockList, length, sourceProperties).ConfigureAwait(false); + List<(long Offset, long Length)> commitBlockList = GetRangeList(blockSize, length); + if (_destinationResource.TransferType == DataTransferOrder.Unordered) + { + await QueueStageBlockRequests(commitBlockList, length, sourceProperties).ConfigureAwait(false); + } + else // Sequential + { + // Queue the first partitioned block task + await QueueStageBlockRequest( + commitBlockList[0].Offset, + commitBlockList[0].Length, + length, + sourceProperties).ConfigureAwait(false); + } } - else // Sequential + else { - // Queue the first partitioned block task - await QueueStageBlockRequest( - commitBlockList[0].Offset, - commitBlockList[0].Length, - length, - sourceProperties).ConfigureAwait(false); + await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false); } } - else + catch (Exception ex) { - await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false); + await InvokeFailedArg(ex).ConfigureAwait(false); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index db106b2d6544..c41a882de603 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -186,10 +186,10 @@ public override async Task ProcessPartToChunkAsync() // Attempt to get the length, it's possible the file could // not be accessible (or does not exist). string operationName = $"{nameof(TransferManager.StartTransferAsync)}"; - await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); - long? fileLength = default; try { + await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); + long? fileLength = default; StorageResourceItemProperties properties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false); fileLength = properties.ResourceLength; diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs index 2fcfd6170cdc..d778320fdf63 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs @@ -281,7 +281,15 @@ public void DisposeHandlers() /// An IEnumerable that contains the job parts public virtual async IAsyncEnumerable ProcessJobToJobPartAsync() { - await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); + try + { + await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } int partNumber = 0; if (_jobParts.Count == 0) @@ -324,7 +332,18 @@ public virtual async IAsyncEnumerable ProcessJobToJobPartAsync( } } - if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false)) + bool isEnumerationComplete; + try + { + isEnumerationComplete = await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } + + if (!isEnumerationComplete) { await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false)) { @@ -333,8 +352,15 @@ public virtual async IAsyncEnumerable ProcessJobToJobPartAsync( } } - // Call regardless of the outcome of enumeration so job can pause/finish - await OnEnumerationComplete().ConfigureAwait(false); + try + { + // Call regardless of the outcome of enumeration so job can pause/finish + await OnEnumerationComplete().ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + } } private async IAsyncEnumerable GetStorageResourcesAsync() diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index 55983aa0db1e..cecc1c8fd3be 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -196,10 +196,9 @@ public override async Task ProcessPartToChunkAsync() { // we can default the length to 0 because we know the destination is local and // does not require a length to be created. - await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); - try { + await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); if (!_sourceResource.Length.HasValue) { await UnknownDownloadInternal().ConfigureAwait(false);