Skip to content

Commit

Permalink
[Storage][DataMovement] Fixes for single transfer size (#46875)
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Oct 30, 2024
1 parent d056e18 commit a47c2f4
Show file tree
Hide file tree
Showing 17 changed files with 41 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,12 @@ internal class AppendBlobStorageResource : StorageResourceItemInternal

public override string ProviderId => "blob";

/// <summary>
/// Defines the recommended Transfer Type for the storage resource.
/// </summary>
protected override DataTransferOrder TransferType => DataTransferOrder.Sequential;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
protected override long MaxSupportedSingleTransferSize => Constants.Blob.Append.MaxAppendBlockBytes;

protected override long MaxSupportedChunkSize => Constants.Blob.Append.MaxAppendBlockBytes;

/// <summary>
/// Length of the storage resource. This information is obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected override long? Length => ResourceProperties?.ResourceLength;

internal AppendBlobStorageResource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,12 @@ internal class BlockBlobStorageResource : StorageResourceItemInternal

public override string ProviderId => "blob";

/// <summary>
/// Defines the recommended Transfer Type of the storage resource.
/// </summary>
protected override DataTransferOrder TransferType => DataTransferOrder.Unordered;

/// <summary>
/// Store Max Initial Size that a Put Blob can get to.
/// </summary>
internal static long _maxInitialSize => Constants.Blob.Block.Pre_2019_12_12_MaxUploadBytes;
protected override long MaxSupportedSingleTransferSize => Constants.Blob.Block.MaxUploadBytes;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
protected override long MaxSupportedChunkSize => Constants.Blob.Block.MaxStageBytes;

/// <summary>
/// Length of the storage resource. This information is can obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected override long? Length => ResourceProperties?.ResourceLength;

/// <summary>
Expand Down Expand Up @@ -168,7 +154,7 @@ await BlobClient.UploadAsync(
DataMovementBlobsExtensions.GetBlobUploadOptions(
_options,
overwrite,
_maxInitialSize,
MaxSupportedSingleTransferSize, // We don't want any internal partioning
options?.SourceProperties),
cancellationToken: cancellationToken).ConfigureAwait(false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,12 @@ internal class PageBlobStorageResource : StorageResourceItemInternal

public override string ProviderId => "blob";

/// <summary>
/// Defines the recommended Transfer Type for the storage resource.
/// </summary>
protected override DataTransferOrder TransferType => DataTransferOrder.Unordered;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
protected override long MaxSupportedSingleTransferSize => Constants.Blob.Page.MaxPageBlockBytes;

protected override long MaxSupportedChunkSize => Constants.Blob.Page.MaxPageBlockBytes;

/// <summary>
/// Length of the storage resource. This information is obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected override long? Length => ResourceProperties?.ResourceLength;

public PageBlobStorageResource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ internal class ShareFileStorageResource : StorageResourceItemInternal

protected override DataTransferOrder TransferType => DataTransferOrder.Unordered;

protected override long MaxSupportedSingleTransferSize => DataMovementShareConstants.MaxRange;

protected override long MaxSupportedChunkSize => DataMovementShareConstants.MaxRange;

protected override long? Length => ResourceProperties?.ResourceLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ protected StorageResourceItem() { }
protected internal override bool IsContainer { get { throw null; } }
protected internal abstract long? Length { get; }
protected internal abstract long MaxSupportedChunkSize { get; }
protected internal abstract long MaxSupportedSingleTransferSize { get; }
protected internal abstract string ResourceId { get; }
protected internal Azure.Storage.DataMovement.StorageResourceItemProperties ResourceProperties { get { throw null; } set { } }
protected internal abstract Azure.Storage.DataMovement.DataTransferOrder TransferType { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ protected StorageResourceItem() { }
protected internal override bool IsContainer { get { throw null; } }
protected internal abstract long? Length { get; }
protected internal abstract long MaxSupportedChunkSize { get; }
protected internal abstract long MaxSupportedSingleTransferSize { get; }
protected internal abstract string ResourceId { get; }
protected internal Azure.Storage.DataMovement.StorageResourceItemProperties ResourceProperties { get { throw null; } set { } }
protected internal abstract Azure.Storage.DataMovement.DataTransferOrder TransferType { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class DataTransferOptions : IEquatable<DataTransferOptions>
/// The default value is 4 MiB.
/// <para/>
/// When resuming a transfer, the default value will be the value specified
/// when the transfer was first started.
/// when the transfer was first started but can still be overriden.
/// <para/>
/// This value may be clamped to the maximum allowed for the particular transfer/resource type.
/// </summary>
Expand All @@ -31,7 +31,7 @@ public class DataTransferOptions : IEquatable<DataTransferOptions>
/// The default value is 32 MiB.
/// <para/>
/// When resuming a transfer, the default value will be the value specified
/// when the transfer was first started.
/// when the transfer was first started but can still be overriden.
/// <para/>
/// This value may be clamped to the maximum allowed for the particular transfer/resource type.
/// </summary>
Expand Down
4 changes: 2 additions & 2 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ internal JobPartInternal(
SingleTransferCompletedEventHandler = singleTransferEventHandler;
ClientDiagnostics = clientDiagnostics;

// Set transfer sizes to user specified values or default
// Set transfer sizes to user specified values or default,
// clamped to max supported chunk size for the destination.
_initialTransferSize = Math.Min(
initialTransferSize ?? DataMovementConstants.DefaultInitialTransferSize,
_destinationResource.MaxSupportedChunkSize);
_destinationResource.MaxSupportedSingleTransferSize);
_transferChunkSize = Math.Min(
transferChunkSize ?? DataMovementConstants.DefaultChunkSize,
_destinationResource.MaxSupportedChunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,12 @@ internal class LocalFileStorageResource : StorageResourceItem

public override string ProviderId => "local";

/// <summary>
/// Defines the recommended Transfer Type of the resource
/// </summary>
protected internal override DataTransferOrder TransferType => DataTransferOrder.Sequential;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
/// TODO: consider changing this.
protected internal override long MaxSupportedSingleTransferSize => Constants.Blob.Block.MaxStageBytes;

protected internal override long MaxSupportedChunkSize => Constants.Blob.Block.MaxStageBytes;

/// <summary>
/// Length of the storage resource. This information is can obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected internal override long? Length => default;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ protected StorageResourceItem() { }
/// </summary>
protected internal abstract DataTransferOrder TransferType { get; }

/// <summary>
/// Defines the maximum supported size for the storage resource to be created
/// in a single API call.
/// </summary>
protected internal abstract long MaxSupportedSingleTransferSize { get; }

/// <summary>
/// Defines the maximum supported chunk size for the storage resource.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private Mock<StorageResourceItem> GetRemoteDestinationResource(bool throwOnDelet
.Returns("BlockBlob");
mock.Setup(b => b.ProviderId)
.Returns("blob");
mock.Setup(b => b.MaxSupportedSingleTransferSize)
.Returns(Constants.GB);
mock.Setup(b => b.MaxSupportedChunkSize)
.Returns(Constants.GB);
mock.Setup(b => b.GetSourceCheckpointData())
Expand Down Expand Up @@ -114,7 +116,8 @@ public async Task CleanupAfterFailureAsync()
destMock.Verify(b => b.Uri, Times.Exactly(6));
destMock.Verify(b => b.ProviderId, Times.Once());
destMock.Verify(b => b.ResourceId, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Exactly(2));
destMock.Verify(b => b.MaxSupportedSingleTransferSize, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Once());
destMock.Verify(b => b.GetDestinationCheckpointData(), Times.Once());
destMock.Verify(b => b.SetPermissionsAsync(
sourceMock.Object,
Expand Down Expand Up @@ -157,7 +160,8 @@ public async Task ErrorThrownDuringCleanup()
destMock.Verify(b => b.Uri, Times.Exactly(6));
destMock.Verify(b => b.ProviderId, Times.Once());
destMock.Verify(b => b.ResourceId, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Exactly(2));
destMock.Verify(b => b.MaxSupportedSingleTransferSize, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Once());
destMock.Verify(b => b.GetDestinationCheckpointData(), Times.Once());
destMock.Verify(b => b.SetPermissionsAsync(
sourceMock.Object,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ internal class MockStorageResourceItem : StorageResourceItem

protected internal override DataTransferOrder TransferType { get; }

protected internal override long MaxSupportedSingleTransferSize => Constants.GB;

protected internal override long MaxSupportedChunkSize => Constants.GB;

protected internal override long? Length { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public async Task ProcessPartToChunkAsync_OneShot()
.Returns(Task.CompletedTask);
mockDestination.Setup(resource => resource.CopyFromUriAsync(It.IsAny<StorageResourceItem>(), It.IsAny<bool>(), It.IsAny<long>(), It.IsAny<StorageResourceCopyFromUriOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(Constants.MB);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(Constants.MB);

// Set up default checkpointer with transfer job
Expand Down Expand Up @@ -203,6 +204,7 @@ public async Task ProcessPartToChunkAsync_Chunks()
.Returns(Task.CompletedTask);
mockDestination.Setup(resource => resource.CompleteTransferAsync(It.IsAny<bool>(), It.IsAny<StorageResourceCompleteTransferOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(length - 1);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(chunkSize);

// Set up default checkpointer with transfer job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal class MemoryStorageResourceItem : StorageResourceItem

protected internal override DataTransferOrder TransferType => DataTransferOrder.Unordered;

protected internal override long MaxSupportedSingleTransferSize => long.MaxValue;

protected internal override long MaxSupportedChunkSize => long.MaxValue;

protected internal override long? Length => Buffer.Length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private T ThrowOrDo<T>(Func<T> func) => _throwScopeManager.InScope

protected internal override DataTransferOrder TransferType => ThrowOr(_inner.TransferType);

protected internal override long MaxSupportedSingleTransferSize => ThrowOr(_inner.MaxSupportedSingleTransferSize);

protected internal override long MaxSupportedChunkSize => ThrowOr(_inner.MaxSupportedChunkSize);

protected internal override long? Length => ThrowOr(_inner.Length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public async Task ProcessPartToChunkAsync_OneShot()
Mock<StorageResourceItem> mockDestination = GetServiceStorageResourceItem();
mockDestination.Setup(resource => resource.CopyFromStreamAsync(It.IsAny<Stream>(), It.IsAny<long>(), It.IsAny<bool>(), It.IsAny<long>(), It.IsAny<StorageResourceWriteToOffsetOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(Constants.MB);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(Constants.MB);

// Set up source with properties and read stream
Expand Down Expand Up @@ -211,7 +212,9 @@ public async Task ProcessPartToChunkAsync_Chunks()
.Returns(Task.CompletedTask);
mockDestination.Setup(resource => resource.CompleteTransferAsync(It.IsAny<bool>(), It.IsAny<StorageResourceCompleteTransferOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(length - 1);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(chunkSize);

var data = GetRandomBuffer(length);
using var stream = new MemoryStream(data);
using var stream2 = new MemoryStream(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ public static void BasicSetup(
items.Source.SetupGet(r => r.Length).Returns(itemSize);

items.Destination.SetupGet(r => r.TransferType).Returns(default(DataTransferOrder));
items.Destination.SetupGet(r => r.MaxSupportedSingleTransferSize).Returns(Constants.GB);
items.Destination.SetupGet(r => r.MaxSupportedChunkSize).Returns(Constants.GB);

items.Source.Setup(r => r.GetPropertiesAsync(It.IsAny<CancellationToken>()))
Expand Down Expand Up @@ -560,6 +561,7 @@ public static void VerifyDestinationResourceOnJobProcess(this Mock<StorageResour
{
dstResource.VerifyGet(r => r.Uri);
dstResource.VerifyGet(r => r.ResourceId);
dstResource.VerifyGet(r => r.MaxSupportedSingleTransferSize);
dstResource.VerifyGet(r => r.MaxSupportedChunkSize);
}

Expand Down

0 comments on commit a47c2f4

Please sign in to comment.