Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Fixes for single transfer size #46875

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,12 @@ internal class AppendBlobStorageResource : StorageResourceItemInternal

public override string ProviderId => "blob";

/// <summary>
/// Defines the recommended Transfer Type for the storage resource.
/// </summary>
protected override DataTransferOrder TransferType => DataTransferOrder.Sequential;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
protected override long MaxSupportedSingleTransferSize => Constants.Blob.Append.MaxAppendBlockBytes;

protected override long MaxSupportedChunkSize => Constants.Blob.Append.MaxAppendBlockBytes;

/// <summary>
/// Length of the storage resource. This information is obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected override long? Length => ResourceProperties?.ResourceLength;

internal AppendBlobStorageResource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,12 @@ internal class BlockBlobStorageResource : StorageResourceItemInternal

public override string ProviderId => "blob";

/// <summary>
/// Defines the recommended Transfer Type of the storage resource.
/// </summary>
protected override DataTransferOrder TransferType => DataTransferOrder.Unordered;

/// <summary>
/// Store Max Initial Size that a Put Blob can get to.
/// </summary>
internal static long _maxInitialSize => Constants.Blob.Block.Pre_2019_12_12_MaxUploadBytes;
protected override long MaxSupportedSingleTransferSize => Constants.Blob.Block.MaxUploadBytes;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
protected override long MaxSupportedChunkSize => Constants.Blob.Block.MaxStageBytes;

/// <summary>
/// Length of the storage resource. This information is can obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected override long? Length => ResourceProperties?.ResourceLength;

/// <summary>
Expand Down Expand Up @@ -168,7 +154,7 @@ await BlobClient.UploadAsync(
DataMovementBlobsExtensions.GetBlobUploadOptions(
_options,
overwrite,
_maxInitialSize,
MaxSupportedSingleTransferSize, // We don't want any internal partioning
options?.SourceProperties),
cancellationToken: cancellationToken).ConfigureAwait(false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,12 @@ internal class PageBlobStorageResource : StorageResourceItemInternal

public override string ProviderId => "blob";

/// <summary>
/// Defines the recommended Transfer Type for the storage resource.
/// </summary>
protected override DataTransferOrder TransferType => DataTransferOrder.Unordered;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
protected override long MaxSupportedSingleTransferSize => Constants.Blob.Page.MaxPageBlockBytes;

protected override long MaxSupportedChunkSize => Constants.Blob.Page.MaxPageBlockBytes;

/// <summary>
/// Length of the storage resource. This information is obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected override long? Length => ResourceProperties?.ResourceLength;

public PageBlobStorageResource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ internal class ShareFileStorageResource : StorageResourceItemInternal

protected override DataTransferOrder TransferType => DataTransferOrder.Unordered;

protected override long MaxSupportedSingleTransferSize => DataMovementShareConstants.MaxRange;

protected override long MaxSupportedChunkSize => DataMovementShareConstants.MaxRange;

protected override long? Length => ResourceProperties?.ResourceLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ protected StorageResourceItem() { }
protected internal override bool IsContainer { get { throw null; } }
protected internal abstract long? Length { get; }
protected internal abstract long MaxSupportedChunkSize { get; }
protected internal abstract long MaxSupportedSingleTransferSize { get; }
protected internal abstract string ResourceId { get; }
protected internal Azure.Storage.DataMovement.StorageResourceItemProperties ResourceProperties { get { throw null; } set { } }
protected internal abstract Azure.Storage.DataMovement.DataTransferOrder TransferType { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ protected StorageResourceItem() { }
protected internal override bool IsContainer { get { throw null; } }
protected internal abstract long? Length { get; }
protected internal abstract long MaxSupportedChunkSize { get; }
protected internal abstract long MaxSupportedSingleTransferSize { get; }
protected internal abstract string ResourceId { get; }
protected internal Azure.Storage.DataMovement.StorageResourceItemProperties ResourceProperties { get { throw null; } set { } }
protected internal abstract Azure.Storage.DataMovement.DataTransferOrder TransferType { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class DataTransferOptions : IEquatable<DataTransferOptions>
/// The default value is 4 MiB.
/// <para/>
/// When resuming a transfer, the default value will be the value specified
/// when the transfer was first started.
/// when the transfer was first started but can still be overriden.
/// <para/>
/// This value may be clamped to the maximum allowed for the particular transfer/resource type.
/// </summary>
Expand All @@ -31,7 +31,7 @@ public class DataTransferOptions : IEquatable<DataTransferOptions>
/// The default value is 32 MiB.
/// <para/>
/// When resuming a transfer, the default value will be the value specified
/// when the transfer was first started.
/// when the transfer was first started but can still be overriden.
/// <para/>
/// This value may be clamped to the maximum allowed for the particular transfer/resource type.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ internal JobPartInternal(
SingleTransferCompletedEventHandler = singleTransferEventHandler;
ClientDiagnostics = clientDiagnostics;

// Set transfer sizes to user specified values or default
// Set transfer sizes to user specified values or default,
// clamped to max supported chunk size for the destination.
_initialTransferSize = Math.Min(
initialTransferSize ?? DataMovementConstants.DefaultInitialTransferSize,
_destinationResource.MaxSupportedChunkSize);
_destinationResource.MaxSupportedSingleTransferSize);
_transferChunkSize = Math.Min(
transferChunkSize ?? DataMovementConstants.DefaultChunkSize,
_destinationResource.MaxSupportedChunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,12 @@ internal class LocalFileStorageResource : StorageResourceItem

public override string ProviderId => "local";

/// <summary>
/// Defines the recommended Transfer Type of the resource
/// </summary>
protected internal override DataTransferOrder TransferType => DataTransferOrder.Sequential;

/// <summary>
/// Defines the maximum chunk size for the storage resource.
/// </summary>
/// TODO: consider changing this.
protected internal override long MaxSupportedSingleTransferSize => Constants.Blob.Block.MaxStageBytes;

protected internal override long MaxSupportedChunkSize => Constants.Blob.Block.MaxStageBytes;

/// <summary>
/// Length of the storage resource. This information is can obtained during a GetStorageResources API call.
///
/// Will return default if the length was not set by a GetStorageResources API call.
/// </summary>
protected internal override long? Length => default;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ protected StorageResourceItem() { }
/// </summary>
protected internal abstract DataTransferOrder TransferType { get; }

/// <summary>
/// Defines the maximum supported size for the storage resource to be created
/// in a single API call.
/// </summary>
protected internal abstract long MaxSupportedSingleTransferSize { get; }

/// <summary>
/// Defines the maximum supported chunk size for the storage resource.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private Mock<StorageResourceItem> GetRemoteDestinationResource(bool throwOnDelet
.Returns("BlockBlob");
mock.Setup(b => b.ProviderId)
.Returns("blob");
mock.Setup(b => b.MaxSupportedSingleTransferSize)
.Returns(Constants.GB);
mock.Setup(b => b.MaxSupportedChunkSize)
.Returns(Constants.GB);
mock.Setup(b => b.GetSourceCheckpointData())
Expand Down Expand Up @@ -114,7 +116,8 @@ public async Task CleanupAfterFailureAsync()
destMock.Verify(b => b.Uri, Times.Exactly(6));
destMock.Verify(b => b.ProviderId, Times.Once());
destMock.Verify(b => b.ResourceId, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Exactly(2));
destMock.Verify(b => b.MaxSupportedSingleTransferSize, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Once());
destMock.Verify(b => b.GetDestinationCheckpointData(), Times.Once());
destMock.Verify(b => b.SetPermissionsAsync(
sourceMock.Object,
Expand Down Expand Up @@ -157,7 +160,8 @@ public async Task ErrorThrownDuringCleanup()
destMock.Verify(b => b.Uri, Times.Exactly(6));
destMock.Verify(b => b.ProviderId, Times.Once());
destMock.Verify(b => b.ResourceId, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Exactly(2));
destMock.Verify(b => b.MaxSupportedSingleTransferSize, Times.Once());
destMock.Verify(b => b.MaxSupportedChunkSize, Times.Once());
destMock.Verify(b => b.GetDestinationCheckpointData(), Times.Once());
destMock.Verify(b => b.SetPermissionsAsync(
sourceMock.Object,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ internal class MockStorageResourceItem : StorageResourceItem

protected internal override DataTransferOrder TransferType { get; }

protected internal override long MaxSupportedSingleTransferSize => Constants.GB;

protected internal override long MaxSupportedChunkSize => Constants.GB;

protected internal override long? Length { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public async Task ProcessPartToChunkAsync_OneShot()
.Returns(Task.CompletedTask);
mockDestination.Setup(resource => resource.CopyFromUriAsync(It.IsAny<StorageResourceItem>(), It.IsAny<bool>(), It.IsAny<long>(), It.IsAny<StorageResourceCopyFromUriOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(Constants.MB);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(Constants.MB);

// Set up default checkpointer with transfer job
Expand Down Expand Up @@ -203,6 +204,7 @@ public async Task ProcessPartToChunkAsync_Chunks()
.Returns(Task.CompletedTask);
mockDestination.Setup(resource => resource.CompleteTransferAsync(It.IsAny<bool>(), It.IsAny<StorageResourceCompleteTransferOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(length - 1);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(chunkSize);

// Set up default checkpointer with transfer job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal class MemoryStorageResourceItem : StorageResourceItem

protected internal override DataTransferOrder TransferType => DataTransferOrder.Unordered;

protected internal override long MaxSupportedSingleTransferSize => long.MaxValue;

protected internal override long MaxSupportedChunkSize => long.MaxValue;

protected internal override long? Length => Buffer.Length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private T ThrowOrDo<T>(Func<T> func) => _throwScopeManager.InScope

protected internal override DataTransferOrder TransferType => ThrowOr(_inner.TransferType);

protected internal override long MaxSupportedSingleTransferSize => ThrowOr(_inner.MaxSupportedSingleTransferSize);

protected internal override long MaxSupportedChunkSize => ThrowOr(_inner.MaxSupportedChunkSize);

protected internal override long? Length => ThrowOr(_inner.Length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public async Task ProcessPartToChunkAsync_OneShot()
Mock<StorageResourceItem> mockDestination = GetServiceStorageResourceItem();
mockDestination.Setup(resource => resource.CopyFromStreamAsync(It.IsAny<Stream>(), It.IsAny<long>(), It.IsAny<bool>(), It.IsAny<long>(), It.IsAny<StorageResourceWriteToOffsetOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(Constants.MB);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(Constants.MB);

// Set up source with properties and read stream
Expand Down Expand Up @@ -211,7 +212,9 @@ public async Task ProcessPartToChunkAsync_Chunks()
.Returns(Task.CompletedTask);
mockDestination.Setup(resource => resource.CompleteTransferAsync(It.IsAny<bool>(), It.IsAny<StorageResourceCompleteTransferOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
mockDestination.Setup(r => r.MaxSupportedSingleTransferSize).Returns(length - 1);
mockDestination.Setup(r => r.MaxSupportedChunkSize).Returns(chunkSize);

var data = GetRandomBuffer(length);
using var stream = new MemoryStream(data);
using var stream2 = new MemoryStream(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ public static void BasicSetup(
items.Source.SetupGet(r => r.Length).Returns(itemSize);

items.Destination.SetupGet(r => r.TransferType).Returns(default(DataTransferOrder));
items.Destination.SetupGet(r => r.MaxSupportedSingleTransferSize).Returns(Constants.GB);
items.Destination.SetupGet(r => r.MaxSupportedChunkSize).Returns(Constants.GB);

items.Source.Setup(r => r.GetPropertiesAsync(It.IsAny<CancellationToken>()))
Expand Down Expand Up @@ -560,6 +561,7 @@ public static void VerifyDestinationResourceOnJobProcess(this Mock<StorageResour
{
dstResource.VerifyGet(r => r.Uri);
dstResource.VerifyGet(r => r.ResourceId);
dstResource.VerifyGet(r => r.MaxSupportedSingleTransferSize);
dstResource.VerifyGet(r => r.MaxSupportedChunkSize);
}

Expand Down