Skip to content

Commit

Permalink
Merge pull request dotnet#287 from dotnet/no-chunks-for-google
Browse files Browse the repository at this point in the history
Refactor and fix Google Artifacts uploads
  • Loading branch information
baronfel authored Jan 16, 2023
2 parents 0b29edc + 511d8cc commit 592c814
Showing 1 changed file with 102 additions and 56 deletions.
158 changes: 102 additions & 56 deletions Microsoft.NET.Build.Containers/Registry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ public readonly bool IsAmazonECRRegistry
}
}

/// <summary>
/// Check to see if the registry is for Google Artifact Registry.
/// </summary>
/// <remarks>
/// Google Artifact Registry locations (one for each availability zone) are of the form "ZONE-docker.pkg.dev".
/// </remarks>
public readonly bool IsGoogleArtifactRegistry {
get => RegistryName.EndsWith("-docker.pkg.dev");
}

/// <summary>
/// Google Artifact Registry doesn't support chunked upload, but we want the capability check to be agnostic to the target.
/// </summary>
private readonly bool SupportsChunkedUpload => !IsGoogleArtifactRegistry;

/// <summary>
/// Pushing to ECR uses a much larger chunk size. To avoid getting too many socket disconnects trying to do too many
/// parallel uploads be more conservative and upload one layer at a time.
/// </summary>
private readonly bool SupportsParallelUploads => !IsAmazonECRRegistry;

public async Task<Image> GetImageManifest(string name, string reference)
{
HttpClient client = GetClient();
Expand Down Expand Up @@ -143,42 +164,10 @@ public async Task Push(Layer layer, string name, Action<string> logProgressMessa
}
}

private readonly async Task UploadBlob(string name, string digest, Stream contents)
{
HttpClient client = GetClient();

if (await BlobAlreadyUploaded(name, digest, client))
{
// Already there!
return;
}

Uri pushUri = new Uri(BaseUri, $"/v2/{name}/blobs/uploads/");
HttpResponseMessage pushResponse = await client.PostAsync(pushUri, content: null);

if (pushResponse.StatusCode != HttpStatusCode.Accepted)
{
string errorMessage = $"Failed to upload blob to {pushUri}; received {pushResponse.StatusCode} with detail {await pushResponse.Content.ReadAsStringAsync()}";
throw new ApplicationException(errorMessage);
}

UriBuilder x;
if (pushResponse.Headers.Location is {IsAbsoluteUri: true })
{
x = new UriBuilder(pushResponse.Headers.Location);
}
else
{
// if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
// Uri constructor does this on our behalf.
x = new UriBuilder(new Uri(BaseUri, pushResponse.Headers.Location?.OriginalString ?? ""));
}

Uri patchUri = x.Uri;

x.Query += $"&digest={Uri.EscapeDataString(digest)}";

Uri putUri = x.Uri;
private readonly async Task<UriBuilder> UploadBlobChunked(string name, string digest, Stream contents, HttpClient client, UriBuilder uploadUri) {
Uri patchUri = uploadUri.Uri;
var localUploadUri = new UriBuilder(uploadUri.Uri);
localUploadUri.Query += $"&digest={Uri.EscapeDataString(digest)}";

// TODO: this chunking is super tiny and probably not necessary; what does the docker client do
// and can we be smarter?
Expand Down Expand Up @@ -209,27 +198,66 @@ private readonly async Task UploadBlob(string name, string digest, Stream conten
throw new ApplicationException(errorMessage);
}

if (patchResponse.Headers.Location is { IsAbsoluteUri: true })
{
x = new UriBuilder(patchResponse.Headers.Location);
}
else
{
// if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
// Uri constructor does this on our behalf.
x = new UriBuilder(new Uri(BaseUri, patchResponse.Headers.Location?.OriginalString ?? ""));
}
localUploadUri = GetNextLocation(patchResponse);

patchUri = x.Uri;
patchUri = localUploadUri.Uri;

chunkCount += 1;
chunkStart += bytesRead;
}
return new UriBuilder(patchUri);
}

private readonly UriBuilder GetNextLocation(HttpResponseMessage response) {
if (response.Headers.Location is {IsAbsoluteUri: true })
{
return new UriBuilder(response.Headers.Location);
}
else
{
// if we don't trim the BaseUri and relative Uri of slashes, you can get invalid urls.
// Uri constructor does this on our behalf.
return new UriBuilder(new Uri(BaseUri, response.Headers.Location?.OriginalString ?? ""));
}
}

private readonly async Task<UriBuilder> UploadBlobWhole(string name, string digest, Stream contents, HttpClient client, UriBuilder uploadUri) {
StreamContent content = new StreamContent(contents);
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
content.Headers.ContentLength = contents.Length;
HttpResponseMessage patchResponse = await client.PatchAsync(uploadUri.Uri, content);
if (patchResponse.StatusCode != HttpStatusCode.Accepted)
{
string errorMessage = $"Failed to upload to {uploadUri}; received {patchResponse.StatusCode} with detail {await patchResponse.Content.ReadAsStringAsync()}";
throw new ApplicationException(errorMessage);
}
return GetNextLocation(patchResponse);
}

private readonly async Task<UriBuilder> StartUploadSession(string name, string digest, HttpClient client) {
Uri startUploadUri = new Uri(BaseUri, $"/v2/{name}/blobs/uploads/");

HttpResponseMessage pushResponse = await client.PostAsync(startUploadUri, content: null);

if (pushResponse.StatusCode != HttpStatusCode.Accepted)
{
string errorMessage = $"Failed to upload blob to {startUploadUri}; received {pushResponse.StatusCode} with detail {await pushResponse.Content.ReadAsStringAsync()}";
throw new ApplicationException(errorMessage);
}

return GetNextLocation(pushResponse);
}

private readonly async Task<UriBuilder> UploadBlobContents(string name, string digest, Stream contents, HttpClient client, UriBuilder uploadUri) {
if (SupportsChunkedUpload) return await UploadBlobChunked(name, digest, contents, client, uploadUri);
else return await UploadBlobWhole(name, digest, contents, client, uploadUri);
}

private readonly async Task FinishUploadSession(string digest, HttpClient client, UriBuilder uploadUri) {
// PUT with digest to finalize
x.Query += $"&digest={Uri.EscapeDataString(digest)}";
uploadUri.Query += $"&digest={Uri.EscapeDataString(digest)}";

putUri = x.Uri;
var putUri = uploadUri.Uri;

HttpResponseMessage finalizeResponse = await client.PutAsync(putUri, content: null);

Expand All @@ -240,6 +268,26 @@ private readonly async Task UploadBlob(string name, string digest, Stream conten
}
}

private readonly async Task UploadBlob(string name, string digest, Stream contents)
{
HttpClient client = GetClient();

if (await BlobAlreadyUploaded(name, digest, client))
{
// Already there!
return;
}

// Three steps to this process:
// * start an upload session
var uploadUri = await StartUploadSession(name, digest, client);
// * upload the blob
var finalChunkUri = await UploadBlobContents(name, digest, contents, client, uploadUri);
// * finish the upload session
await FinishUploadSession(digest, client, finalChunkUri);

}

private readonly async Task<bool> BlobAlreadyUploaded(string name, string digest, HttpClient client)
{
HttpResponseMessage response = await client.SendAsync(new HttpRequestMessage(HttpMethod.Head, new Uri(BaseUri, $"/v2/{name}/blobs/{digest}")));
Expand Down Expand Up @@ -320,19 +368,17 @@ public async Task Push(Image x, string name, string? tag, string baseName, Actio
}
};

// Pushing to ECR uses a much larger chunk size. To avoid getting too many socket disconnects trying to do too many
// parallel uploads be more conservative and upload one layer at a time.
if(IsAmazonECRRegistry)
if (SupportsParallelUploads)
{
await Task.WhenAll(x.LayerDescriptors.Select(descriptor => uploadLayerFunc(descriptor)));
}
else
{
foreach(var descriptor in x.LayerDescriptors)
{
await uploadLayerFunc(descriptor);
}
}
else
{
await Task.WhenAll(x.LayerDescriptors.Select(descriptor => uploadLayerFunc(descriptor)));
}

using (MemoryStream stringStream = new MemoryStream(Encoding.UTF8.GetBytes(x.config.ToJsonString())))
{
Expand Down

0 comments on commit 592c814

Please sign in to comment.