Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from Azure:main #803

Merged
merged 2 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@

# ServiceLabel: %KeyVault
# AzureSdkOwners: @JonathanCrd
# ServiceOwners: @RandalliLama
# ServiceOwners: @cheathamb36 @chen-karen @lgonsoulin

# ServiceLabel: %Kubernetes Configuration
# ServiceOwners: @NarayanThiru
Expand Down
97 changes: 50 additions & 47 deletions sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,14 @@ public override async Task ProcessPartToChunkAsync()
return;
}
await OnTransferStateChangedAsync(TransferState.InProgress).ConfigureAwait(false);

if (!_sourceResource.Length.HasValue)
{
await UnknownDownloadInternal().ConfigureAwait(false);
await UnknownLengthDownloadAsync().ConfigureAwait(false);
}
else
{
await LengthKnownDownloadInternal().ConfigureAwait(false);
await KnownLengthDownloadAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
Expand All @@ -220,36 +221,34 @@ public override async Task ProcessPartToChunkAsync()
}
}

internal async Task UnknownDownloadInternal()
internal async Task UnknownLengthDownloadAsync()
{
Task<StorageResourceReadStreamResult> initialTask = _sourceResource.ReadStreamAsync(
position: 0,
length: _initialTransferSize,
_cancellationToken);

try
{
StorageResourceReadStreamResult initialResult = default;
try
{
initialResult = await initialTask.ConfigureAwait(false);
initialResult = await _sourceResource.ReadStreamAsync(
position: 0,
length: _initialTransferSize,
_cancellationToken).ConfigureAwait(false);
}
catch
{
// Range not accepted, we need to attempt to use a default range
// This can happen if the source is empty.
initialResult = await _sourceResource.ReadStreamAsync(
cancellationToken: _cancellationToken)
.ConfigureAwait(false);
}
// If the initial request returned no content (i.e., a 304),
// we'll pass that back to the user immediately

long? initialLength = initialResult?.ContentLength;

// There needs to be at least 1 chunk to create the blob even if the
// length is 0 bytes.
if (initialResult == default || (initialLength ?? 0) == 0)
{
await CreateZeroLengthDownload().ConfigureAwait(false);
await QueueChunkToChannelAsync(ZeroLengthDownloadAsync).ConfigureAwait(false);
return;
}

Expand Down Expand Up @@ -285,41 +284,20 @@ internal async Task UnknownDownloadInternal()
}
}

internal async Task LengthKnownDownloadInternal()
internal async Task KnownLengthDownloadAsync()
{
long totalLength = _sourceResource.Length.Value;
if (totalLength == 0)
{
await CreateZeroLengthDownload().ConfigureAwait(false);
await QueueChunkToChannelAsync(ZeroLengthDownloadAsync).ConfigureAwait(false);
}
// Download with a single GET
else if (_initialTransferSize >= totalLength)
{
// To prevent requesting a range that is invalid when
// we already know the length we can just make one get blob request.
StorageResourceReadStreamResult result = await _sourceResource.
ReadStreamAsync(length: totalLength, cancellationToken: _cancellationToken)
await QueueChunkToChannelAsync(
async () =>
await DownloadSingleAsync(totalLength).ConfigureAwait(false))
.ConfigureAwait(false);

long downloadLength = result.ContentLength.Value;
// This should not occur but add a check just in case
if (downloadLength != totalLength)
{
throw Errors.SingleDownloadLengthMismatch(totalLength, downloadLength);
}

bool successfulCopy = await CopyToStreamInternal(
offset: 0,
sourceLength: downloadLength,
source: result.Content,
expectedLength: totalLength,
initial: true).ConfigureAwait(false);
if (successfulCopy)
{
await ReportBytesWrittenAsync(downloadLength).ConfigureAwait(false);
// Queue the work to end the download
await QueueCompleteFileDownload().ConfigureAwait(false);
}
}
// Download in chunks
else
Expand All @@ -328,7 +306,6 @@ internal async Task LengthKnownDownloadInternal()
}
}

#region PartitionedDownloader
private async Task QueueChunksToChannel(long initialLength, long totalLength)
{
// Create Download Chunk event handler to manage when the ranges finish downloading
Expand All @@ -352,7 +329,7 @@ private async Task QueueChunksToChannel(long initialLength, long totalLength)
// return before it's completed downloading)
await QueueChunkToChannelAsync(
async () =>
await DownloadStreamingInternal(range: httpRange).ConfigureAwait(false))
await DownloadChunkAsync(range: httpRange).ConfigureAwait(false))
.ConfigureAwait(false);
chunkCount++;
}
Expand All @@ -366,7 +343,7 @@ await DownloadStreamingInternal(range: httpRange).ConfigureAwait(false))
}
}

internal async Task CompleteFileDownload()
internal async Task CompleteFileDownloadAsync()
{
try
{
Expand All @@ -389,7 +366,35 @@ await _destinationResource.CompleteTransferAsync(
}
}

internal async Task DownloadStreamingInternal(HttpRange range)
private async Task DownloadSingleAsync(long totalLength)
{
// To prevent requesting a range that is invalid when
// we already know the length we can just make one get blob request.
StorageResourceReadStreamResult result = await _sourceResource
.ReadStreamAsync(length: totalLength, cancellationToken: _cancellationToken)
.ConfigureAwait(false);

long downloadLength = result.ContentLength.Value;
// This should not occur but add a check just in case
if (downloadLength != totalLength)
{
throw Errors.SingleDownloadLengthMismatch(totalLength, downloadLength);
}

bool successfulCopy = await CopyToStreamInternal(
offset: 0,
sourceLength: downloadLength,
source: result.Content,
expectedLength: totalLength,
initial: true).ConfigureAwait(false);
if (successfulCopy)
{
await ReportBytesWrittenAsync(downloadLength).ConfigureAwait(false);
await CompleteFileDownloadAsync().ConfigureAwait(false);
}
}

private async Task DownloadChunkAsync(HttpRange range)
{
try
{
Expand Down Expand Up @@ -478,7 +483,7 @@ private static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(U

private Task QueueCompleteFileDownload()
{
return QueueChunkToChannelAsync(CompleteFileDownload);
return QueueChunkToChannelAsync(CompleteFileDownloadAsync);
}

private static IEnumerable<HttpRange> GetRanges(long initialLength, long totalLength, long rangeSize)
Expand All @@ -488,7 +493,6 @@ private static IEnumerable<HttpRange> GetRanges(long initialLength, long totalLe
yield return new HttpRange(offset, Math.Min(totalLength - offset, rangeSize));
}
}
#endregion PartitionedDownloader

public override async Task InvokeSkippedArgAsync()
{
Expand All @@ -509,7 +513,7 @@ public override async Task DisposeHandlersAsync()
}
}

private async Task CreateZeroLengthDownload()
private async Task ZeroLengthDownloadAsync()
{
// We just need to at minimum create the file
bool successfulCreation = await CopyToStreamInternal(
Expand All @@ -520,8 +524,7 @@ private async Task CreateZeroLengthDownload()
initial: true).ConfigureAwait(false);
if (successfulCreation)
{
// Queue the work to end the download
await QueueCompleteFileDownload().ConfigureAwait(false);
await CompleteFileDownloadAsync().ConfigureAwait(false);
}
else
{
Expand Down