From fa515eb87f160ccaf6a2a9a08615c682261a1a04 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon <96087589+jalauzon-msft@users.noreply.github.com> Date: Wed, 4 Dec 2024 15:45:28 -0800 Subject: [PATCH] [Storage][DataMovement] Refactor/add bounds to CommitChunkHandler (#47417) --- .../src/CommitChunkHandler.cs | 173 ++++-------------- .../src/JobPartInternal.cs | 21 +-- .../src/QueueStageChunkArgs.cs | 27 +++ .../src/ServiceToServiceJobPart.cs | 71 ++----- .../src/Shared/DataMovementConstants.cs | 1 + .../src/StageChunkEventArgs.cs | 72 -------- .../src/StreamToUriJobPart.cs | 70 ++----- .../tests/CommitChunkHandlerTests.cs | 116 +++--------- 8 files changed, 129 insertions(+), 422 deletions(-) create mode 100644 sdk/storage/Azure.Storage.DataMovement/src/QueueStageChunkArgs.cs delete mode 100644 sdk/storage/Azure.Storage.DataMovement/src/StageChunkEventArgs.cs diff --git a/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs index 603b9c89579e..af4696cbd5ad 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs @@ -2,20 +2,14 @@ // Licensed under the MIT License. using System; -using Azure.Core; using System.Threading.Tasks; using System.Threading; -using System.Threading.Channels; -using Azure.Core.Pipeline; using Azure.Storage.Common; namespace Azure.Storage.DataMovement { internal class CommitChunkHandler : IDisposable { - // Indicates whether the current thread is processing stage chunks. - private static Task _processStageChunkEvents; - #region Delegate Definitions public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength, StorageResourceItemProperties properties); public delegate Task QueueCommitBlockTaskInternal(StorageResourceItemProperties sourceProperties); @@ -36,21 +30,17 @@ public struct Behaviors public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; } } - private event SyncAsyncEventHandler _commitBlockHandler; - internal SyncAsyncEventHandler GetCommitBlockHandler() => _commitBlockHandler; - /// - /// Create channel of to keep track of that are + /// Create channel of to keep track of that are /// waiting to update the bytesTransferred and other required operations. /// - private readonly Channel _stageChunkChannel; - private CancellationToken _cancellationToken; + private readonly IProcessor _stageChunkProcessor; + private readonly CancellationToken _cancellationToken; private long _bytesTransferred; private readonly long _expectedLength; private readonly long _blockSize; private readonly DataTransferOrder _transferOrder; - private readonly ClientDiagnostics _clientDiagnostics; private readonly StorageResourceItemProperties _sourceProperties; public CommitChunkHandler( @@ -58,7 +48,6 @@ public CommitChunkHandler( long blockSize, Behaviors behaviors, DataTransferOrder transferOrder, - ClientDiagnostics clientDiagnostics, StorageResourceItemProperties sourceProperties, CancellationToken cancellationToken) { @@ -67,7 +56,14 @@ public CommitChunkHandler( throw Errors.InvalidExpectedLength(expectedLength); } Argument.AssertNotNull(behaviors, nameof(behaviors)); - Argument.AssertNotNull(clientDiagnostics, nameof(clientDiagnostics)); + + _cancellationToken = cancellationToken; + // Set bytes transferred to block size because we transferred the initial block + _bytesTransferred = blockSize; + _expectedLength = expectedLength; + _blockSize = blockSize; + _transferOrder = transferOrder; + _sourceProperties = sourceProperties; _queuePutBlockTask = behaviors.QueuePutBlockTask ?? throw Errors.ArgumentNull(nameof(behaviors.QueuePutBlockTask)); @@ -78,152 +74,61 @@ public CommitChunkHandler( _invokeFailedEventHandler = behaviors.InvokeFailedHandler ?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler)); - // Set expected length to perform commit task - _expectedLength = expectedLength; - - // Create channel of finished Stage Chunk Args to update the bytesTransferred - // and for ending tasks like commit block. - // The size of the channel should never exceed 50k (limit on blocks in a block blob). - // and that's in the worst case that we never read from the channel and had a maximum chunk blob. - _stageChunkChannel = Channel.CreateUnbounded( - new UnboundedChannelOptions() - { - // Single reader is required as we can only read and write to bytesTransferred value - SingleReader = true, - }); - _cancellationToken = cancellationToken; - - // Set bytes transferred to block size because we transferred the initial block - _bytesTransferred = blockSize; - - _processStageChunkEvents = Task.Run(() => NotifyOfPendingStageChunkEvents()); - - _blockSize = blockSize; - _transferOrder = transferOrder; - if (_transferOrder == DataTransferOrder.Sequential) - { - _commitBlockHandler += SequentialBlockEvent; - } - _commitBlockHandler += ConcurrentBlockEvent; - _clientDiagnostics = clientDiagnostics; - _sourceProperties = sourceProperties; + _stageChunkProcessor = ChannelProcessing.NewProcessor( + readers: 1, + capacity: DataMovementConstants.Channels.StageChunkCapacity); + _stageChunkProcessor.Process = ProcessCommitRange; } public void Dispose() { - // We no longer have to read from the channel. We are not expecting any more requests. - _stageChunkChannel.Writer.TryComplete(); - DisposeHandlers(); + _stageChunkProcessor.TryComplete(); } - private void DisposeHandlers() + public async ValueTask QueueChunkAsync(QueueStageChunkArgs args) { - if (_transferOrder == DataTransferOrder.Sequential) - { - _commitBlockHandler -= SequentialBlockEvent; - } - _commitBlockHandler -= ConcurrentBlockEvent; + await _stageChunkProcessor.QueueAsync(args).ConfigureAwait(false); } - private async Task ConcurrentBlockEvent(StageChunkEventArgs args) + private async Task ProcessCommitRange(QueueStageChunkArgs args, CancellationToken cancellationToken = default) { try { - if (args.Success) - { - // Let's add to the channel, and our notifier will handle the chunks. - _stageChunkChannel.Writer.TryWrite(args); - } - else - { - // Log an unexpected error since it came back unsuccessful - throw args.Exception; - } - } - catch (Exception ex) - { - // Log an unexpected error since it came back unsuccessful - await _invokeFailedEventHandler(ex).ConfigureAwait(false); - } - } + _bytesTransferred += args.BytesTransferred; + _reportProgressInBytes(args.BytesTransferred); - private async Task NotifyOfPendingStageChunkEvents() - { - try - { - while (await _stageChunkChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false)) + if (_bytesTransferred == _expectedLength) { - // Read one event argument at a time. - StageChunkEventArgs args = await _stageChunkChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); - - // don't need to use Interlocked.Add() as we are reading one event at a time - // and _bytesTransferred is not being read/updated from any other thread - _bytesTransferred += args.BytesTransferred; - - // Report the incremental bytes transferred - _reportProgressInBytes(args.BytesTransferred); - - if (_bytesTransferred == _expectedLength) - { - // Add CommitBlockList task to the channel - await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false); - } - else if (_bytesTransferred > _expectedLength) - { - throw Errors.MismatchLengthTransferred( - expectedLength: _expectedLength, - actualLength: _bytesTransferred); - } + // Add CommitBlockList task to the channel + await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false); } - } - catch (Exception ex) - { - await _invokeFailedEventHandler(ex).ConfigureAwait(false); - } - } - - private async Task SequentialBlockEvent(StageChunkEventArgs args) - { - try - { - if (args.Success) + else if (_bytesTransferred < _expectedLength) { - long oldOffset = args.Offset; - long newOffset = oldOffset + _blockSize; - if (newOffset < _expectedLength) + // If this is a sequential transfer, we need to queue the next chunk + if (_transferOrder == DataTransferOrder.Sequential) { + long newOffset = args.Offset + _blockSize; long blockLength = (newOffset + _blockSize < _expectedLength) ? - _blockSize : - _expectedLength - newOffset; - await _queuePutBlockTask(newOffset, blockLength, _expectedLength, _sourceProperties).ConfigureAwait(false); + _blockSize : + _expectedLength - newOffset; + await _queuePutBlockTask( + newOffset, + blockLength, + _expectedLength, + _sourceProperties).ConfigureAwait(false); } } - else + else // _bytesTransferred > _expectedLength { - // Log an unexpected error since it came back unsuccessful - throw args.Exception; + throw Errors.MismatchLengthTransferred( + expectedLength: _expectedLength, + actualLength: _bytesTransferred); } } catch (Exception ex) { - // Log an unexpected error since it came back unsuccessful await _invokeFailedEventHandler(ex).ConfigureAwait(false); } } - - public async Task InvokeEvent(StageChunkEventArgs args) - { - // There's a race condition where the event handler was disposed and an event - // was already invoked, we should skip over this as the download chunk handler - // was already disposed, and we should just ignore any more incoming events. - if (_commitBlockHandler != null) - { - await _commitBlockHandler.RaiseAsync( - args, - nameof(CommitChunkHandler), - nameof(_commitBlockHandler), - _clientDiagnostics).ConfigureAwait(false); - } - } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index 67d1590974fa..dc9e3bf53fd0 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -504,26 +504,7 @@ internal static long ParseRangeTotalLength(string range) return long.Parse(range.Substring(lengthSeparator + 1), CultureInfo.InvariantCulture); } - internal static List<(long Offset, long Size)> GetRangeList(long blockSize, long fileLength) - { - // The list tracking blocks IDs we're going to commit - List<(long Offset, long Size)> partitions = new List<(long, long)>(); - - // Partition the stream into individual blocks - foreach ((long Offset, long Length) block in GetPartitionIndexes(fileLength, blockSize)) - { - /* We need to do this first! Length is calculated on the fly based on stream buffer - * contents; We need to record the partition data first before consuming the stream - * asynchronously. */ - partitions.Add(block); - } - return partitions; - } - - /// - /// Partition a stream into a series of blocks buffered as needed by an array pool. - /// - private static IEnumerable<(long Offset, long Length)> GetPartitionIndexes( + protected static IEnumerable<(long Offset, long Length)> GetRanges( long streamLength, // StreamLength needed to divide before hand long blockSize) { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/QueueStageChunkArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/QueueStageChunkArgs.cs new file mode 100644 index 000000000000..7bb59cc7f329 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/src/QueueStageChunkArgs.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Storage.DataMovement +{ + /// + /// This class is interchangable for + /// Stage Block (Put Block), Stage Block From Uri (Put Block From URL), + /// Append Block (Append Block), Append Block From Uri (Append Block From URL), + /// Upload Page (Put Page), Upload Pages From Uri (Put Pages From URL) + /// + /// Basically any transfer operation that must end in a Commit Block List + /// will end up using this internal event argument to track the success + /// and the bytes transferred to ensure the correct amount of bytes are tranferred. + /// + internal class QueueStageChunkArgs + { + public long Offset { get; } + public long BytesTransferred { get; } + + public QueueStageChunkArgs(long offset, long bytesTransferred) + { + Offset = offset; + BytesTransferred = bytesTransferred; + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs index f0ca9318f1d6..03fc82abfb2d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Azure.Storage.Common; @@ -216,27 +217,29 @@ await StartSingleCallCopy(length).ConfigureAwait(false)) // Perform a series of chunk copies followed by a commit long blockSize = _transferChunkSize; - - _commitBlockHandler = GetCommitController( + _commitBlockHandler = new CommitChunkHandler( expectedLength: length, blockSize: blockSize, - this, + GetBlockListCommitHandlerBehaviors(this), _destinationResource.TransferType, - sourceProperties); + sourceProperties, + _cancellationToken); + // If we cannot upload in one shot, initiate the parallel block uploader if (await CreateDestinationResource(length, blockSize).ConfigureAwait(false)) { - List<(long Offset, long Length)> commitBlockList = GetRangeList(blockSize, length); + IEnumerable<(long Offset, long Length)> ranges = GetRanges(length, blockSize); if (_destinationResource.TransferType == DataTransferOrder.Unordered) { - await QueueStageBlockRequests(commitBlockList, length, sourceProperties).ConfigureAwait(false); + await QueueStageBlockRequests(ranges, length, sourceProperties).ConfigureAwait(false); } else // Sequential { // Queue the first partitioned block task + (long Offset, long Length) first = ranges.First(); await QueueStageBlockRequest( - commitBlockList[0].Offset, - commitBlockList[0].Length, + first.Offset, + first.Length, length, sourceProperties).ConfigureAwait(false); } @@ -330,21 +333,6 @@ await _destinationResource.CopyBlockFromUriAsync( } #region CommitChunkController - internal CommitChunkHandler GetCommitController( - long expectedLength, - long blockSize, - ServiceToServiceJobPart jobPart, - DataTransferOrder transferType, - StorageResourceItemProperties sourceProperties) - => new CommitChunkHandler( - expectedLength, - blockSize, - GetBlockListCommitHandlerBehaviors(jobPart), - transferType, - ClientDiagnostics, - sourceProperties, - _cancellationToken); - internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors( ServiceToServiceJobPart jobPart) { @@ -381,13 +369,13 @@ await _destinationResource.CompleteTransferAsync( } private async Task QueueStageBlockRequests( - List<(long Offset, long Size)> commitBlockList, + IEnumerable<(long Offset, long Size)> ranges, long expectedLength, StorageResourceItemProperties sourceProperties) { _queueingTasks = true; // Partition the stream into individual blocks - foreach ((long Offset, long Length) block in commitBlockList) + foreach ((long Offset, long Length) block in ranges) { if (_cancellationToken.IsCancellationRequested) { @@ -440,16 +428,10 @@ await _destinationResource.CopyBlockFromUriAsync( // The chunk handler may have been disposed in failure case if (_commitBlockHandler != null) { - // Invoke event handler to keep track of all the stage blocks - await _commitBlockHandler.InvokeEvent( - new StageChunkEventArgs( - transferId: _dataTransfer.Id, - success: true, - offset: offset, - bytesTransferred: blockLength, - exception: default, - isRunningSynchronously: true, - cancellationToken: _cancellationToken)).ConfigureAwait(false); + // Queue result to increment bytes transferred and check for completion + await _commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( + offset: offset, + bytesTransferred: blockLength)).ConfigureAwait(false); } } catch (RequestFailedException ex) @@ -469,24 +451,7 @@ await _commitBlockHandler.InvokeEvent( } catch (Exception ex) { - if (_commitBlockHandler != null) - { - await _commitBlockHandler.InvokeEvent( - new StageChunkEventArgs( - transferId: _dataTransfer.Id, - success: false, - offset: offset, - bytesTransferred: blockLength, - exception: ex, - isRunningSynchronously: true, - cancellationToken: _cancellationToken)).ConfigureAwait(false); - } - else - { - // If the _commitBlockHandler has been disposed before we call to it - // we should at least filter the exception to error handling just in case. - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - } + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs index b71bbe4302de..fac2248bfb09 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs @@ -20,6 +20,7 @@ internal static class Channels internal const int JobPartCapacity = 1000; internal const int JobChunkCapacity = 1000; internal const int DownloadChunkCapacity = 16; + internal const int StageChunkCapacity = 1000; } internal static class ConcurrencyTuner diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StageChunkEventArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/StageChunkEventArgs.cs deleted file mode 100644 index c3e14c8cbe73..000000000000 --- a/sdk/storage/Azure.Storage.DataMovement/src/StageChunkEventArgs.cs +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System; -using System.Threading; -using Azure.Core; -using Azure.Storage.Common; - -namespace Azure.Storage.DataMovement -{ - /// - /// This class is interchangable for - /// Stage Block (Put Block), Stage Block From Uri (Put Block From URL), - /// Append Block (Append Block), Append Block From Uri (Append Block From URL), - /// Upload Page (Put Page), Upload Pages From Uri (Put Pages From URL) - /// - /// Basically any transfer operation that must end in a Commit Block List - /// will end up using this internal event argument to track the success - /// and the bytes transferred to ensure the correct amount of bytes are tranferred. - /// - internal class StageChunkEventArgs : DataTransferEventArgs - { - public bool Success { get; } - - public long Offset { get; } - - /// - /// Will be 0 if Success is false - /// - public long BytesTransferred { get; } - - /// - /// If is false, this value will be populated - /// with the exception that was thrown. - /// - public Exception Exception { get; } - - /// - /// Constructor - /// - /// - /// - /// - /// - /// - /// - /// - public StageChunkEventArgs( - string transferId, - bool success, - long offset, - long bytesTransferred, - Exception exception, - bool isRunningSynchronously, - CancellationToken cancellationToken) : - base(transferId, isRunningSynchronously, cancellationToken) - { - if (success && exception != null) - { - Argument.AssertNull(exception, nameof(exception)); - } - else if (!success && exception == null) - { - Argument.AssertNotNull(exception, nameof(exception)); - } - Success = success; - Offset = offset; - BytesTransferred = bytesTransferred; - Exception = exception; - } - } -} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index 05558d51a42e..350743cec797 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -10,6 +10,7 @@ using Azure.Storage.Shared; using Azure.Core; using Azure.Storage.Common; +using System.Linq; namespace Azure.Storage.DataMovement { @@ -209,13 +210,13 @@ await CreateDestinationResource( return; } long blockSize = _transferChunkSize; - - _commitBlockHandler = GetCommitController( + _commitBlockHandler = new CommitChunkHandler( expectedLength: length, blockSize: blockSize, - this, + GetBlockListCommitHandlerBehaviors(this), _destinationResource.TransferType, - properties); + properties, + _cancellationToken); bool destinationCreated = await CreateDestinationResource( blockSize, @@ -225,17 +226,18 @@ await CreateDestinationResource( if (destinationCreated) { // If we cannot upload in one shot, initiate the parallel block uploader - List<(long Offset, long Length)> rangeList = GetRangeList(blockSize, length); + IEnumerable<(long Offset, long Length)> ranges = GetRanges(length, blockSize); if (_destinationResource.TransferType == DataTransferOrder.Unordered) { - await QueueStageBlockRequests(rangeList, length, properties).ConfigureAwait(false); + await QueueStageBlockRequests(ranges, length, properties).ConfigureAwait(false); } else // Sequential { // Queue the first partitioned block task + (long Offset, long Length) first = ranges.First(); await QueueStageBlockRequest( - rangeList[0].Offset, - rangeList[0].Length, + first.Offset, + first.Length, length, properties).ConfigureAwait(false); } @@ -358,21 +360,6 @@ await _destinationResource.CopyFromStreamAsync( } #region CommitChunkController - internal CommitChunkHandler GetCommitController( - long expectedLength, - long blockSize, - StreamToUriJobPart jobPart, - DataTransferOrder transferType, - StorageResourceItemProperties sourceProperties) - => new CommitChunkHandler( - expectedLength, - blockSize, - GetBlockListCommitHandlerBehaviors(jobPart), - transferType, - ClientDiagnostics, - sourceProperties, - _cancellationToken); - internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors( StreamToUriJobPart jobPart) { @@ -423,38 +410,15 @@ await _destinationResource.CopyFromStreamAsync( // The chunk handler may have been disposed in failure case if (_commitBlockHandler != null) { - // Invoke event handler to keep track of all the stage blocks - await _commitBlockHandler.InvokeEvent( - new StageChunkEventArgs( - transferId: _dataTransfer.Id, - success: true, - offset: offset, - bytesTransferred: blockLength, - exception: default, - isRunningSynchronously: true, - cancellationToken: _cancellationToken)).ConfigureAwait(false); + // Queue result to increment bytes transferred and check for completion + await _commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( + offset: offset, + bytesTransferred: blockLength)).ConfigureAwait(false); } } catch (Exception ex) { - if (_commitBlockHandler != null) - { - await _commitBlockHandler.InvokeEvent( - new StageChunkEventArgs( - transferId: _dataTransfer.Id, - success: false, - offset: offset, - bytesTransferred: blockLength, - exception: ex, - isRunningSynchronously: true, - cancellationToken: _cancellationToken)).ConfigureAwait(false); - } - else - { - // If the _commitBlockHandler has been disposed before we call to it - // we should at least filter the exception to error handling just in case. - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - } + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -476,13 +440,13 @@ await _destinationResource.CompleteTransferAsync( } private async Task QueueStageBlockRequests( - List<(long Offset, long Size)> rangeList, + IEnumerable<(long Offset, long Size)> ranges, long completeLength, StorageResourceItemProperties sourceProperties) { _queueingTasks = true; // Partition the stream into individual blocks - foreach ((long Offset, long Length) block in rangeList) + foreach ((long Offset, long Length) block in ranges) { if (_cancellationToken.IsCancellationRequested) { diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/CommitChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/CommitChunkHandlerTests.cs index 116f62214d1a..2ba3479e64a9 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/CommitChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/CommitChunkHandlerTests.cs @@ -10,6 +10,7 @@ using Azure.Core; using Azure.Storage.Test; using Azure.Core.Pipeline; +using System.Threading.Channels; namespace Azure.Storage.DataMovement.Tests { @@ -166,20 +167,14 @@ public async Task OneChunkTransfer(long blockSize) InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, DataTransferOrder.Unordered, - ClientDiagnostics, default, CancellationToken.None); // Make one chunk that would meet the expected length - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); VerifyDelegateInvocations( behaviors: mockCommitChunkBehaviors, @@ -209,20 +204,14 @@ public async Task ParallelChunkTransfer(long blockSize) InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, DataTransferOrder.Unordered, - ClientDiagnostics, default, CancellationToken.None); // Make one chunk that would update the bytes but not cause a commit block list to occur - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); // Assert VerifyDelegateInvocations( @@ -233,15 +222,10 @@ await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( expectedCompleteFileCount: 0); // Now add the last block to meet the required commited block amount. - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize * 2, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); // Assert VerifyDelegateInvocations( @@ -272,20 +256,14 @@ public async Task ParallelChunkTransfer_ExceedError(long blockSize) InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, DataTransferOrder.Unordered, - ClientDiagnostics, default, CancellationToken.None); // Make one chunk that would update the bytes that would cause the bytes to exceed the expected amount - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize, - bytesTransferred: blockSize * 2, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize * 2)); // Assert VerifyDelegateInvocations( @@ -318,7 +296,6 @@ public async Task ParallelChunkTransfer_MultipleProcesses(long blockSize, int ta InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, DataTransferOrder.Unordered, - ClientDiagnostics, default, CancellationToken.None); @@ -326,15 +303,10 @@ public async Task ParallelChunkTransfer_MultipleProcesses(long blockSize, int ta for (int i = 0; i < taskSize; i++) { - Task task = commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + Task task = Task.Run(async () => await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize))); runningTasks.Add(task); } @@ -371,20 +343,14 @@ public async Task SequentialChunkTransfer(long blockSize) InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, DataTransferOrder.Sequential, - ClientDiagnostics, default, CancellationToken.None); // Make one chunk that would update the bytes but not cause a commit block list to occur - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); // Assert VerifyDelegateInvocations( @@ -395,15 +361,10 @@ await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( expectedCompleteFileCount: 0); // Now add the last block to meet the required commited block amount. - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize * 2, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); // Assert VerifyDelegateInvocations( @@ -434,20 +395,14 @@ public async Task SequentialChunkTransfer_ExceedError(long blockSize) InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, DataTransferOrder.Sequential, - ClientDiagnostics, default, CancellationToken.None); // Make one chunk that would update the bytes that would cause the bytes to exceed the expected amount - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize, - bytesTransferred: blockSize * 2, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize * 2)); // Assert VerifyDelegateInvocations( @@ -479,19 +434,13 @@ public async Task GetPutBlockTask_ExpectedFailure() InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, transferOrder: DataTransferOrder.Sequential, - ClientDiagnostics, default, CancellationToken.None); // Act - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( offset: blockSize, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); // Assert VerifyDelegateInvocations( @@ -522,19 +471,13 @@ public async Task GetCommitBlockTask_ExpectedFailure() InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, transferOrder: DataTransferOrder.Unordered, - ClientDiagnostics, default, CancellationToken.None); // Act - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( offset: 0, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); // Assert VerifyDelegateInvocations( @@ -546,7 +489,7 @@ await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( } [Test] - public async Task DisposedEventHandler() + public void Dispose() { // Arrange - Create CommitChunkHandler then Dispose it so the event handler is disposed MockCommitChunkBehaviors mockCommitChunkBehaviors = GetCommitChunkBehaviors(); @@ -564,15 +507,14 @@ public async Task DisposedEventHandler() InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, transferOrder: DataTransferOrder.Unordered, - ClientDiagnostics, default, CancellationToken.None); // Act commitBlockHandler.Dispose(); - // Assert - Do not throw when trying to invoke the event handler when disposed - await commitBlockHandler.InvokeEvent(default); + Assert.ThrowsAsync(async () => + await commitBlockHandler.QueueChunkAsync(default)); VerifyDelegateInvocations( behaviors: mockCommitChunkBehaviors, @@ -618,20 +560,14 @@ public async Task CompleteTransferTask_Properties() InvokeFailedHandler = mockCommitChunkBehaviors.InvokeFailedEventHandlerTask.Object, }, DataTransferOrder.Unordered, - ClientDiagnostics, properties, CancellationToken.None); // Make one chunk that would meet the expected length - await commitBlockHandler.InvokeEvent(new StageChunkEventArgs( - transferId: "fake-id", - success: true, + await commitBlockHandler.QueueChunkAsync(new QueueStageChunkArgs( // Before commit block is called, one block chunk has already been added when creating the destination offset: blockSize, - bytesTransferred: blockSize, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + bytesTransferred: blockSize)); VerifyDelegateInvocations( behaviors: mockCommitChunkBehaviors,