diff --git a/.travis.yml b/.travis.yml index 2788a5f56..c9723da50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ language: go: - "1.10" + - "1.11" git: depth: 1 diff --git a/pkg/crane/append.go b/pkg/crane/append.go index 76c71f368..eb5f45def 100644 --- a/pkg/crane/append.go +++ b/pkg/crane/append.go @@ -18,13 +18,12 @@ import ( "log" "net/http" - "github.com/spf13/cobra" - "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/mutate" "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/google/go-containerregistry/pkg/v1/tarball" + "github.com/spf13/cobra" ) func init() { Root.AddCommand(NewCmdAppend()) } diff --git a/pkg/v1/mutate/mutate.go b/pkg/v1/mutate/mutate.go index b24d6896b..5fc792d09 100644 --- a/pkg/v1/mutate/mutate.go +++ b/pkg/v1/mutate/mutate.go @@ -29,6 +29,7 @@ import ( "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/empty" "github.com/google/go-containerregistry/pkg/v1/partial" + "github.com/google/go-containerregistry/pkg/v1/stream" "github.com/google/go-containerregistry/pkg/v1/tarball" "github.com/google/go-containerregistry/pkg/v1/types" "github.com/google/go-containerregistry/pkg/v1/v1util" @@ -58,77 +59,14 @@ func Append(base v1.Image, adds ...Addendum) (v1.Image, error) { if len(adds) == 0 { return base, nil } - if err := validate(adds); err != nil { return nil, err } - m, err := base.Manifest() - if err != nil { - return nil, err - } - - cf, err := base.ConfigFile() - if err != nil { - return nil, err - } - - image := &image{ - Image: base, - manifest: m.DeepCopy(), - configFile: cf.DeepCopy(), - diffIDMap: make(map[v1.Hash]v1.Layer), - digestMap: make(map[v1.Hash]v1.Layer), - } - - diffIDs := image.configFile.RootFS.DiffIDs - history := image.configFile.History - - for _, add := range adds { - diffID, err := add.Layer.DiffID() - if err != nil { - return nil, err - } - diffIDs = append(diffIDs, diffID) - history = append(history, add.History) - image.diffIDMap[diffID] = add.Layer - } - - manifestLayers := image.manifest.Layers - - for _, add := range adds { - d := v1.Descriptor{ - MediaType: types.DockerLayer, - } - - if d.Size, err = add.Layer.Size(); err != nil { - return nil, err - } - - if d.Digest, err = add.Layer.Digest(); err != nil { - return nil, err - } - - manifestLayers = append(manifestLayers, d) - image.digestMap[d.Digest] = add.Layer - } - - image.configFile.RootFS.DiffIDs = diffIDs - image.configFile.History = history - image.manifest.Layers = manifestLayers - - rcfg, err := image.RawConfigFile() - if err != nil { - return nil, err - } - d, sz, err := v1.SHA256(bytes.NewBuffer(rcfg)) - if err != nil { - return nil, err - } - image.manifest.Config.Digest = d - image.manifest.Config.Size = sz - - return image, nil + return &image{ + base: base, + adds: adds, + }, nil } // Config mutates the provided v1.Image to have the provided v1.Config @@ -150,10 +88,8 @@ func configFile(base v1.Image, cfg *v1.ConfigFile) (v1.Image, error) { } image := &image{ - Image: base, - manifest: m.DeepCopy(), - configFile: cfg, - digestMap: make(map[v1.Hash]v1.Layer), + base: base, + manifest: m.DeepCopy(), } rcfg, err := image.RawConfigFile() @@ -166,6 +102,7 @@ func configFile(base v1.Image, cfg *v1.ConfigFile) (v1.Image, error) { } image.manifest.Config.Digest = d image.manifest.Config.Size = sz + image.configFile = cfg return image, nil } @@ -183,16 +120,113 @@ func CreatedAt(base v1.Image, created v1.Time) (v1.Image, error) { } type image struct { - v1.Image + base v1.Image + adds []Addendum + + computed bool configFile *v1.ConfigFile manifest *v1.Manifest diffIDMap map[v1.Hash]v1.Layer digestMap map[v1.Hash]v1.Layer } +var _ v1.Image = (*image)(nil) + +func (i *image) MediaType() (types.MediaType, error) { return i.base.MediaType() } + +func (i *image) compute() error { + // Don't re-compute if already computed. + if i.computed { + return nil + } + cf, err := i.base.ConfigFile() + if err != nil { + return err + } + configFile := cf.DeepCopy() + diffIDs := configFile.RootFS.DiffIDs + history := configFile.History + + diffIDMap := make(map[v1.Hash]v1.Layer) + digestMap := make(map[v1.Hash]v1.Layer) + + for _, add := range i.adds { + diffID, err := add.Layer.DiffID() + if err != nil { + return err + } + diffIDs = append(diffIDs, diffID) + history = append(history, add.History) + diffIDMap[diffID] = add.Layer + } + + m, err := i.base.Manifest() + if err != nil { + return err + } + manifest := m.DeepCopy() + manifestLayers := manifest.Layers + for _, add := range i.adds { + d := v1.Descriptor{ + MediaType: types.DockerLayer, + } + + var err error + if d.Size, err = add.Layer.Size(); err != nil { + return err + } + + if d.Digest, err = add.Layer.Digest(); err != nil { + return err + } + + manifestLayers = append(manifestLayers, d) + digestMap[d.Digest] = add.Layer + } + + configFile.RootFS.DiffIDs = diffIDs + configFile.History = history + + manifest.Layers = manifestLayers + + rcfg, err := json.Marshal(configFile) + if err != nil { + return err + } + d, sz, err := v1.SHA256(bytes.NewBuffer(rcfg)) + if err != nil { + return err + } + manifest.Config.Digest = d + manifest.Config.Size = sz + + i.configFile = configFile + i.manifest = manifest + i.diffIDMap = diffIDMap + i.digestMap = digestMap + i.computed = true + return nil +} + // Layers returns the ordered collection of filesystem layers that comprise this image. // The order of the list is oldest/base layer first, and most-recent/top layer last. func (i *image) Layers() ([]v1.Layer, error) { + if err := i.compute(); err == stream.ErrNotComputed { + // Image contains a streamable layer which has not yet been + // consumed. Just return the layers we have in case the caller + // is going to consume the layers. + layers, err := i.base.Layers() + if err != nil { + return nil, err + } + for _, add := range i.adds { + layers = append(layers, add.Layer) + } + return layers, nil + } else if err != nil { + return nil, err + } + diffIDs, err := partial.DiffIDs(i) if err != nil { return nil, err @@ -210,36 +244,57 @@ func (i *image) Layers() ([]v1.Layer, error) { // BlobSet returns an unordered collection of all the blobs in the image. func (i *image) BlobSet() (map[v1.Hash]struct{}, error) { + if err := i.compute(); err != nil { + return nil, err + } return partial.BlobSet(i) } // ConfigName returns the hash of the image's config file. func (i *image) ConfigName() (v1.Hash, error) { + if err := i.compute(); err != nil { + return v1.Hash{}, err + } return partial.ConfigName(i) } // ConfigFile returns this image's config file. func (i *image) ConfigFile() (*v1.ConfigFile, error) { + if err := i.compute(); err != nil { + return nil, err + } return i.configFile, nil } // RawConfigFile returns the serialized bytes of ConfigFile() func (i *image) RawConfigFile() ([]byte, error) { + if err := i.compute(); err != nil { + return nil, err + } return json.Marshal(i.configFile) } // Digest returns the sha256 of this image's manifest. func (i *image) Digest() (v1.Hash, error) { + if err := i.compute(); err != nil { + return v1.Hash{}, err + } return partial.Digest(i) } // Manifest returns this image's Manifest object. func (i *image) Manifest() (*v1.Manifest, error) { + if err := i.compute(); err != nil { + return nil, err + } return i.manifest, nil } // RawManifest returns the serialized bytes of Manifest() func (i *image) RawManifest() ([]byte, error) { + if err := i.compute(); err != nil { + return nil, err + } return json.Marshal(i.manifest) } @@ -254,7 +309,7 @@ func (i *image) LayerByDigest(h v1.Hash) (v1.Layer, error) { if layer, ok := i.digestMap[h]; ok { return layer, nil } - return i.Image.LayerByDigest(h) + return i.base.LayerByDigest(h) } // LayerByDiffID is an analog to LayerByDigest, looking up by "diff id" @@ -263,7 +318,7 @@ func (i *image) LayerByDiffID(h v1.Hash) (v1.Layer, error) { if layer, ok := i.diffIDMap[h]; ok { return layer, nil } - return i.Image.LayerByDiffID(h) + return i.base.LayerByDiffID(h) } func validate(adds []Addendum) error { diff --git a/pkg/v1/mutate/mutate_test.go b/pkg/v1/mutate/mutate_test.go index 0f94e1a94..fd8f56d50 100644 --- a/pkg/v1/mutate/mutate_test.go +++ b/pkg/v1/mutate/mutate_test.go @@ -30,6 +30,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/stream" "github.com/google/go-containerregistry/pkg/v1/tarball" ) @@ -233,19 +234,19 @@ func TestMutateConfig(t *testing.T) { } if manifestsAreEqual(t, source, result) { - t.Fatal("mutating the config MUST mutate the manifest") + t.Error("mutating the config MUST mutate the manifest") } if configFilesAreEqual(t, source, result) { - t.Fatal("mutating the config did not mutate the config file") + t.Error("mutating the config did not mutate the config file") } if configSizesAreEqual(t, source, result) { - t.Fatal("adding an enviornment variable MUST change the config file size") + t.Error("adding an environment variable MUST change the config file size") } if !reflect.DeepEqual(cfg.Config.Env, newEnv) { - t.Fatalf("incorrect environment set %v!=%v", cfg.Config.Env, newEnv) + t.Errorf("incorrect environment set %v!=%v", cfg.Config.Env, newEnv) } } @@ -254,16 +255,16 @@ func TestMutateCreatedAt(t *testing.T) { want := time.Now().Add(-2 * time.Minute) result, err := CreatedAt(source, v1.Time{want}) if err != nil { - t.Fatalf("failed to mutate a config: %v", err) + t.Fatalf("CreatedAt: %v", err) } if configDigestsAreEqual(t, source, result) { - t.Fatal("mutating the created time MUST mutate the config digest") + t.Errorf("mutating the created time MUST mutate the config digest") } got := getConfigFile(t, result).Created.Time if got != want { - t.Fatalf("mutating the created time MUST mutate the time from %v to %v", got, want) + t.Errorf("mutating the created time MUST mutate the time from %v to %v", got, want) } } @@ -301,6 +302,74 @@ func TestLayerTime(t *testing.T) { } } +func TestAppendStreamableLayer(t *testing.T) { + img, err := AppendLayers( + sourceImage(t), + stream.NewLayer(ioutil.NopCloser(strings.NewReader(strings.Repeat("a", 100)))), + stream.NewLayer(ioutil.NopCloser(strings.NewReader(strings.Repeat("b", 100)))), + stream.NewLayer(ioutil.NopCloser(strings.NewReader(strings.Repeat("c", 100)))), + ) + if err != nil { + t.Fatalf("AppendLayers: %v", err) + } + + // Until the streams are consumed, the image manifest is not yet computed. + if _, err := img.Manifest(); err != stream.ErrNotComputed { + t.Errorf("Manifest: got %v, want %v", err, stream.ErrNotComputed) + } + + // We can still get Layers while some are not yet computed. + ls, err := img.Layers() + if err != nil { + t.Errorf("Layers: %v", err) + } + wantDigests := []string{ + "sha256:bfa1c600931132f55789459e2f5a5eb85659ac91bc5a54ce09e3ed14809f8a7f", + "sha256:77a52b9a141dcc4d3d277d053193765dca725626f50eaf56b903ac2439cf7fd1", + "sha256:b78472d63f6e3d31059819173b56fcb0d9479a2b13c097d4addd84889f6aff06", + } + for i, l := range ls[1:] { + rc, err := l.Compressed() + if err != nil { + t.Errorf("Layer %d Compressed: %v", i, err) + } + + // Consume the layer's stream and close it to compute the + // layer's metadata. + if _, err := io.Copy(ioutil.Discard, rc); err != nil { + t.Errorf("Reading layer %d: %v", i, err) + } + if err := rc.Close(); err != nil { + t.Errorf("Closing layer %d: %v", i, err) + } + + // The layer's metadata is now available. + h, err := l.Digest() + if err != nil { + t.Errorf("Digest after consuming layer %d: %v", i, err) + } + if h.String() != wantDigests[i] { + t.Errorf("Layer %d digest got %q, want %q", i, h, wantDigests[i]) + } + } + + // Now that the streamable layers have been consumed, the image's + // manifest can be computed. + if _, err := img.Manifest(); err != nil { + t.Errorf("Manifest: %v", err) + } + + h, err := img.Digest() + if err != nil { + t.Errorf("Digest: %v", err) + } + wantDigest := "sha256:4138c8f7156b863ae1f6b08726ce020650d5445d43b13ad42d04df98e0ab1fed" + if h.String() != wantDigest { + t.Errorf("Image digest got %q, want %q", h, wantDigest) + } + +} + func assertMTime(t *testing.T, layer v1.Layer, expectedTime time.Time) { l, err := layer.Uncompressed() diff --git a/pkg/v1/random/image.go b/pkg/v1/random/image.go index 2f9930fbd..698b55580 100644 --- a/pkg/v1/random/image.go +++ b/pkg/v1/random/image.go @@ -54,8 +54,9 @@ func Image(byteSize, layers int64) (v1.Image, error) { var b bytes.Buffer tw := tar.NewWriter(&b) if err := tw.WriteHeader(&tar.Header{ - Name: fmt.Sprintf("random_file_%d.txt", i), - Size: byteSize, + Name: fmt.Sprintf("random_file_%d.txt", i), + Size: byteSize, + Typeflag: tar.TypeRegA, }); err != nil { return nil, err } diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 1fd633c0d..839d2b3be 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -18,6 +18,7 @@ import ( "bytes" "errors" "fmt" + "io" "log" "net/http" "net/url" @@ -25,7 +26,10 @@ import ( "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/partial" "github.com/google/go-containerregistry/pkg/v1/remote/transport" + "github.com/google/go-containerregistry/pkg/v1/stream" + "golang.org/x/sync/errgroup" ) // Write pushes the provided img to the specified image reference. @@ -41,36 +45,31 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro return err } w := writer{ - ref: ref, - client: &http.Client{Transport: tr}, - img: img, + ref: ref, + client: &http.Client{Transport: tr}, + img: img, } - bs, err := img.BlobSet() - if err != nil { - return err + // Upload individual layers in goroutines and collect any errors. + var g errgroup.Group + for _, l := range ls { + l := l + g.Go(func() error { + return w.uploadOne(l) + }) } - - // Spin up go routines to publish each of the members of BlobSet(), - // and use an error channel to collect their results. - errCh := make(chan error) - defer close(errCh) - for h := range bs { - go func(h v1.Hash) { - errCh <- w.uploadOne(h) - }(h) + if err := g.Wait(); err != nil { + return err } - // Now wait for all of the blob uploads to complete. - var errors []error - for _ = range bs { - if err := <-errCh; err != nil { - errors = append(errors, err) - } + // Now that all the layers are uploaded, upload the config file blob. + // This must be done last because some layers may have been streamed. + l, err := partial.ConfigLayer(img) + if err != nil { + return err } - if len(errors) > 0 { - // Return the first error we encountered. - return errors[0] + if err := w.uploadOne(l); err != nil { + return err } // With all of the constituent elements uploaded, upload the manifest @@ -80,9 +79,9 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro // writer writes the elements of an image to a remote image reference. type writer struct { - ref name.Reference - client *http.Client - img v1.Image + ref name.Reference + client *http.Client + img v1.Image } // url returns a url.Url for the specified path in the context of this remote image reference. @@ -136,20 +135,14 @@ func (w *writer) checkExisting(h v1.Hash) (bool, error) { // On success, the layer was either mounted (nothing more to do) or a blob // upload was initiated and the body of that blob should be sent to the returned // location. -func (w *writer) initiateUpload(h v1.Hash) (location string, mounted bool, err error) { +func (w *writer) initiateUpload(from, mount string) (location string, mounted bool, err error) { u := w.url(fmt.Sprintf("/v2/%s/blobs/uploads/", w.ref.Context().RepositoryStr())) - uv := url.Values{ - "mount": []string{h.String()}, + uv := url.Values{} + if mount != "" { + uv["mount"] = []string{mount} } - l, err := w.img.LayerByDigest(h) - if err != nil { - return "", false, err - } - - if ml, ok := l.(*MountableLayer); ok { - if w.ref.Context().RegistryStr() == ml.Reference.Context().RegistryStr() { - uv["from"] = []string{ml.Reference.Context().RepositoryStr()} - } + if from != "" { + uv["from"] = []string{from} } u.RawQuery = uv.Encode() @@ -181,15 +174,7 @@ func (w *writer) initiateUpload(h v1.Hash) (location string, mounted bool, err e // streamBlob streams the contents of the blob to the specified location. // On failure, this will return an error. On success, this will return the location // header indicating how to commit the streamed blob. -func (w *writer) streamBlob(h v1.Hash, streamLocation string) (commitLocation string, err error) { - l, err := w.img.LayerByDigest(h) - if err != nil { - return "", err - } - blob, err := l.Compressed() - if err != nil { - return "", err - } +func (w *writer) streamBlob(blob io.ReadCloser, streamLocation string) (commitLocation string, err error) { defer blob.Close() req, err := http.NewRequest(http.MethodPatch, streamLocation, blob) @@ -212,14 +197,15 @@ func (w *writer) streamBlob(h v1.Hash, streamLocation string) (commitLocation st return w.nextLocation(resp) } -// commitBlob commits this blob by sending a PUT to the location returned from streaming the blob. -func (w *writer) commitBlob(h v1.Hash, location string) (err error) { +// commitBlob commits this blob by sending a PUT to the location returned from +// streaming the blob. +func (w *writer) commitBlob(location, digest string) error { u, err := url.Parse(location) if err != nil { return err } v := u.Query() - v.Set("digest", h.String()) + v.Set("digest", digest) u.RawQuery = v.Encode() req, err := http.NewRequest(http.MethodPut, u.String(), nil) @@ -237,33 +223,68 @@ func (w *writer) commitBlob(h v1.Hash, location string) (err error) { } // uploadOne performs a complete upload of a single layer. -func (w *writer) uploadOne(h v1.Hash) error { - existing, err := w.checkExisting(h) - if err != nil { - return err +func (w *writer) uploadOne(l v1.Layer) error { + var from, mount, digest string + if _, ok := l.(*stream.Layer); !ok { + // Layer isn't streamable, we should take advantage of that to + // skip uploading if possible. + // By sending ?digest= in the request, we'll also check that + // our computed digest matches the one computed by the + // registry. + h, err := l.Digest() + if err != nil { + return err + } + digest = h.String() + + existing, err := w.checkExisting(h) + if err != nil { + return err + } + if existing { + log.Printf("existing blob: %v", h) + return nil + } + + mount = h.String() } - if existing { - log.Printf("existing blob: %v", h) - return nil + if ml, ok := l.(*MountableLayer); ok { + if w.ref.Context().RegistryStr() == ml.Reference.Context().RegistryStr() { + from = ml.Reference.Context().RepositoryStr() + } } - location, mounted, err := w.initiateUpload(h) + location, mounted, err := w.initiateUpload(from, mount) if err != nil { return err } else if mounted { - log.Printf("mounted blob: %v", h) + h, err := l.Digest() + if err != nil { + return err + } + log.Printf("mounted blob: %s", h.String()) return nil } - location, err = w.streamBlob(h, location) + blob, err := l.Compressed() + if err != nil { + return err + } + location, err = w.streamBlob(blob, location) + if err != nil { + return err + } + + h, err := l.Digest() if err != nil { return err } + digest = h.String() - if err := w.commitBlob(h, location); err != nil { + if err := w.commitBlob(location, digest); err != nil { return err } - log.Printf("pushed blob %v", h) + log.Printf("pushed blob: %s", digest) return nil } diff --git a/pkg/v1/remote/write_test.go b/pkg/v1/remote/write_test.go index 2dbcaed72..fcb561344 100644 --- a/pkg/v1/remote/write_test.go +++ b/pkg/v1/remote/write_test.go @@ -16,8 +16,11 @@ package remote import ( "bytes" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -27,13 +30,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/random" - "github.com/google/go-containerregistry/pkg/v1/remote/transport" + "github.com/google/go-containerregistry/pkg/v1/stream" ) func mustNewTag(t *testing.T, s string) name.Tag { @@ -264,7 +266,7 @@ func TestInitiateUploadNoMountsExists(t *testing.T) { } defer closer.Close() - _, mounted, err := w.initiateUpload(h) + _, mounted, err := w.initiateUpload("", h.String()) if err != nil { t.Errorf("intiateUpload() = %v", err) } @@ -301,7 +303,7 @@ func TestInitiateUploadNoMountsInitiated(t *testing.T) { } defer closer.Close() - location, mounted, err := w.initiateUpload(h) + location, mounted, err := w.initiateUpload("", h.String()) if err != nil { t.Errorf("intiateUpload() = %v", err) } @@ -339,7 +341,7 @@ func TestInitiateUploadNoMountsBadStatus(t *testing.T) { } defer closer.Close() - location, mounted, err := w.initiateUpload(h) + location, mounted, err := w.initiateUpload("", h.String()) if err == nil { t.Errorf("intiateUpload() = %v, %v; wanted error", location, mounted) } @@ -377,7 +379,7 @@ func TestInitiateUploadMountsWithMountFromDifferentRegistry(t *testing.T) { } defer closer.Close() - _, mounted, err := w.initiateUpload(h) + _, mounted, err := w.initiateUpload("", h.String()) if err != nil { t.Errorf("intiateUpload() = %v", err) } @@ -427,7 +429,7 @@ func TestInitiateUploadMountsWithMountFromTheSameRegistry(t *testing.T) { } defer closer.Close() - _, mounted, err := w.initiateUpload(h) + _, mounted, err := w.initiateUpload(expectedMountRepo, h.String()) if err != nil { t.Errorf("intiateUpload() = %v", err) } @@ -470,7 +472,16 @@ func TestStreamBlob(t *testing.T) { streamLocation := w.url(expectedPath) - commitLocation, err := w.streamBlob(h, streamLocation.String()) + l, err := img.LayerByDigest(h) + if err != nil { + t.Fatalf("LayerByDigest: %v", err) + } + blob, err := l.Compressed() + if err != nil { + t.Fatalf("layer.Compressed: %v", err) + } + + commitLocation, err := w.streamBlob(blob, streamLocation.String()) if err != nil { t.Errorf("streamBlob() = %v", err) } @@ -479,6 +490,58 @@ func TestStreamBlob(t *testing.T) { } } +func TestStreamLayer(t *testing.T) { + var n, wantSize int64 = 10000, 49 + newBlob := func() io.ReadCloser { return ioutil.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, int(n)))) } + wantDigest := "sha256:3d7c465be28d9e1ed810c42aeb0e747b44441424f566722ba635dc93c947f30e" + + expectedPath := "/vWhatever/I/decide" + expectedCommitLocation := "https://commit.io/v12/blob" + w, closer, err := setupWriter("what/ever", nil, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPatch { + t.Errorf("Method; got %v, want %v", r.Method, http.MethodPatch) + } + if r.URL.Path != expectedPath { + t.Errorf("URL; got %v, want %v", r.URL.Path, expectedPath) + } + + h := sha256.New() + s, err := io.Copy(h, r.Body) + if err != nil { + t.Errorf("Reading body: %v", err) + } + if s != wantSize { + t.Errorf("Received %d bytes, want %d", s, wantSize) + } + gotDigest := "sha256:" + hex.EncodeToString(h.Sum(nil)) + if gotDigest != wantDigest { + t.Errorf("Received bytes with digest %q, want %q", gotDigest, wantDigest) + } + + w.Header().Set("Location", expectedCommitLocation) + http.Error(w, "Created", http.StatusCreated) + })) + if err != nil { + t.Fatalf("setupWriter() = %v", err) + } + defer closer.Close() + + streamLocation := w.url(expectedPath) + sl := stream.NewLayer(newBlob()) + blob, err := sl.Compressed() + if err != nil { + t.Fatalf("layer.Compressed: %v", err) + } + + commitLocation, err := w.streamBlob(blob, streamLocation.String()) + if err != nil { + t.Errorf("streamBlob: %v", err) + } + if commitLocation != expectedCommitLocation { + t.Errorf("streamBlob(); got %v, want %v", commitLocation, expectedCommitLocation) + } +} + func TestCommitBlob(t *testing.T) { img := setupImage(t) h := mustConfigName(t, img) @@ -506,7 +569,7 @@ func TestCommitBlob(t *testing.T) { commitLocation := w.url(expectedPath) - if err := w.commitBlob(h, commitLocation.String()); err != nil { + if err := w.commitBlob(commitLocation.String(), h.String()); err != nil { t.Errorf("commitBlob() = %v", err) } } @@ -564,11 +627,76 @@ func TestUploadOne(t *testing.T) { } defer closer.Close() - if err := w.uploadOne(h); err != nil { + l, err := img.LayerByDigest(h) + if err != nil { + t.Fatalf("LayerByDigest: %v", err) + } + if err := w.uploadOne(l); err != nil { t.Errorf("uploadOne() = %v", err) } } +func TestUploadOneStreamedLayer(t *testing.T) { + expectedRepo := "baz/blah" + initiatePath := fmt.Sprintf("/v2/%s/blobs/uploads/", expectedRepo) + streamPath := "/path/to/upload" + commitPath := "/path/to/commit" + + w, closer, err := setupWriter(expectedRepo, nil, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case initiatePath: + if r.Method != http.MethodPost { + t.Errorf("Method; got %v, want %v", r.Method, http.MethodPost) + } + w.Header().Set("Location", streamPath) + http.Error(w, "Initiated", http.StatusAccepted) + case streamPath: + if r.Method != http.MethodPatch { + t.Errorf("Method; got %v, want %v", r.Method, http.MethodPatch) + } + // TODO(jasonhall): What should we check here? + w.Header().Set("Location", commitPath) + http.Error(w, "Initiated", http.StatusAccepted) + case commitPath: + if r.Method != http.MethodPut { + t.Errorf("Method; got %v, want %v", r.Method, http.MethodPut) + } + http.Error(w, "Created", http.StatusCreated) + default: + t.Fatalf("Unexpected path: %v", r.URL.Path) + } + })) + if err != nil { + t.Fatalf("setupWriter() = %v", err) + } + defer closer.Close() + + var n, wantSize int64 = 10000, 49 + newBlob := func() io.ReadCloser { return ioutil.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, int(n)))) } + wantDigest := "sha256:3d7c465be28d9e1ed810c42aeb0e747b44441424f566722ba635dc93c947f30e" + wantDiffID := "sha256:27dd1f61b867b6a0f6e9d8a41c43231de52107e53ae424de8f847b821db4b711" + l := stream.NewLayer(newBlob()) + if err := w.uploadOne(l); err != nil { + t.Fatalf("uploadOne: %v", err) + } + + if dig, err := l.Digest(); err != nil { + t.Errorf("Digest: %v", err) + } else if dig.String() != wantDigest { + t.Errorf("Digest got %q, want %q", dig, wantDigest) + } + if diffID, err := l.DiffID(); err != nil { + t.Errorf("DiffID: %v", err) + } else if diffID.String() != wantDiffID { + t.Errorf("DiffID got %q, want %q", diffID, wantDiffID) + } + if size, err := l.Size(); err != nil { + t.Errorf("Size: %v", err) + } else if size != wantSize { + t.Errorf("Size got %d, want %d", size, wantSize) + } +} + func TestCommitImage(t *testing.T) { img := setupImage(t) @@ -711,7 +839,6 @@ func TestWriteWithErrors(t *testing.T) { } func TestScopesForUploadingImage(t *testing.T) { - referenceToUpload, err := name.NewTag("example.com/sample/sample:latest", name.WeakValidation) if err != nil { t.Fatalf("name.NewTag() = %v", err) diff --git a/pkg/v1/stream/layer.go b/pkg/v1/stream/layer.go new file mode 100644 index 000000000..d1ec96b1f --- /dev/null +++ b/pkg/v1/stream/layer.go @@ -0,0 +1,176 @@ +// Copyright 2018 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stream + +import ( + "compress/gzip" + "crypto/sha256" + "encoding/hex" + "errors" + "hash" + "io" + + "github.com/google/go-containerregistry/pkg/v1" +) + +var ( + // ErrNotComputed is returned when the requested value is not yet + // computed because the stream has not been consumed yet. + ErrNotComputed = errors.New("value not computed until stream is consumed") + + // ErrConsumed is returned by Compressed when the underlying stream has + // already been consumed and closed. + ErrConsumed = errors.New("stream was already consumed") +) + +type Layer struct { + blob io.ReadCloser + consumed bool + + digest, diffID *v1.Hash + size int64 +} + +var _ v1.Layer = (*Layer)(nil) + +func NewLayer(rc io.ReadCloser) *Layer { return &Layer{blob: rc} } + +func (l *Layer) Digest() (v1.Hash, error) { + if l.digest == nil { + return v1.Hash{}, ErrNotComputed + } + return *l.digest, nil +} + +func (l *Layer) DiffID() (v1.Hash, error) { + if l.diffID == nil { + return v1.Hash{}, ErrNotComputed + } + return *l.diffID, nil +} + +func (l *Layer) Size() (int64, error) { + if l.size == 0 { + return 0, ErrNotComputed + } + return l.size, nil +} + +func (l *Layer) Uncompressed() (io.ReadCloser, error) { + return nil, errors.New("NYI: stream.Layer.Uncompressed is not implemented") +} + +func (l *Layer) Compressed() (io.ReadCloser, error) { + if l.consumed { + return nil, ErrConsumed + } + return newCompressedReader(l) +} + +type compressedReader struct { + closer io.Closer // original blob's Closer. + + h, zh hash.Hash // collects digests of compressed and uncompressed stream. + pr io.Reader + count *countWriter + + l *Layer // stream.Layer to update upon Close. +} + +func newCompressedReader(l *Layer) (*compressedReader, error) { + h := sha256.New() + zh := sha256.New() + count := &countWriter{} + + // gzip.Writer writes to the output stream via pipe, a hasher to + // capture compressed digest, and a countWriter to capture compressed + // size. + pr, pw := io.Pipe() + zw, err := gzip.NewWriterLevel(io.MultiWriter(pw, zh, count), gzip.BestSpeed) + if err != nil { + return nil, err + } + + cr := &compressedReader{ + closer: newMultiCloser(zw, l.blob), + pr: pr, + h: h, + zh: zh, + count: count, + l: l, + } + go func() { + if _, err := io.Copy(io.MultiWriter(h, zw), l.blob); err != nil { + pw.CloseWithError(err) + return + } + // Now close the compressed reader, to flush the gzip stream + // and calculate digest/diffID/size. This will cause pr to + // return EOF which will cause readers of the Compressed stream + // to finish reading. + pw.CloseWithError(cr.Close()) + }() + + return cr, nil +} + +func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) } + +func (cr *compressedReader) Close() error { + // Close the inner ReadCloser. + if err := cr.closer.Close(); err != nil { + return err + } + + diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.h.Sum(nil))) + if err != nil { + return err + } + cr.l.diffID = &diffID + + digest, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.zh.Sum(nil))) + if err != nil { + return err + } + cr.l.digest = &digest + + cr.l.size = cr.count.n + cr.l.consumed = true + return nil +} + +// countWriter counts bytes written to it. +type countWriter struct{ n int64 } + +func (c *countWriter) Write(p []byte) (int, error) { + c.n += int64(len(p)) + return len(p), nil +} + +// multiCloser is a Closer that collects multiple Closers and Closes them in order. +type multiCloser []io.Closer + +var _ io.Closer = (multiCloser)(nil) + +func newMultiCloser(c ...io.Closer) multiCloser { return multiCloser(c) } + +func (m multiCloser) Close() error { + for _, c := range m { + if err := c.Close(); err != nil { + return err + } + } + return nil +} diff --git a/pkg/v1/stream/layer_test.go b/pkg/v1/stream/layer_test.go new file mode 100644 index 000000000..3e730e699 --- /dev/null +++ b/pkg/v1/stream/layer_test.go @@ -0,0 +1,200 @@ +// Copyright 2018 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stream + +import ( + "archive/tar" + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/tarball" +) + +func TestStreamVsBuffer(t *testing.T) { + var n, wantSize int64 = 10000, 49 + newBlob := func() io.ReadCloser { return ioutil.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, int(n)))) } + wantDigest := "sha256:3d7c465be28d9e1ed810c42aeb0e747b44441424f566722ba635dc93c947f30e" + wantDiffID := "sha256:27dd1f61b867b6a0f6e9d8a41c43231de52107e53ae424de8f847b821db4b711" + + // Check that streaming some content results in the expected digest/diffID/size. + l := NewLayer(newBlob()) + if c, err := l.Compressed(); err != nil { + t.Errorf("Compressed: %v", err) + } else { + if _, err := io.Copy(ioutil.Discard, c); err != nil { + t.Errorf("error reading Compressed: %v", err) + } + if err := c.Close(); err != nil { + t.Errorf("Close: %v", err) + } + } + if d, err := l.Digest(); err != nil { + t.Errorf("Digest: %v", err) + } else if d.String() != wantDigest { + t.Errorf("stream Digest got %q, want %q", d.String(), wantDigest) + } + if d, err := l.DiffID(); err != nil { + t.Errorf("DiffID: %v", err) + } else if d.String() != wantDiffID { + t.Errorf("stream DiffID got %q, want %q", d.String(), wantDiffID) + } + if s, err := l.Size(); err != nil { + t.Errorf("Size: %v", err) + } else if s != wantSize { + t.Errorf("stream Size got %d, want %d", s, wantSize) + } + + // Test that buffering the same contents and using + // tarball.LayerFromOpener results in the same digest/diffID/size. + tl, err := tarball.LayerFromOpener(func() (io.ReadCloser, error) { return newBlob(), nil }) + if err != nil { + t.Fatalf("LayerFromOpener: %v", err) + } + if d, err := tl.Digest(); err != nil { + t.Errorf("Digest: %v", err) + } else if d.String() != wantDigest { + t.Errorf("tarball Digest got %q, want %q", d.String(), wantDigest) + } + if d, err := tl.DiffID(); err != nil { + t.Errorf("DiffID: %v", err) + } else if d.String() != wantDiffID { + t.Errorf("tarball DiffID got %q, want %q", d.String(), wantDiffID) + } + if s, err := tl.Size(); err != nil { + t.Errorf("Size: %v", err) + } else if s != wantSize { + t.Errorf("stream Size got %d, want %d", s, wantSize) + } +} + +func TestLargeStream(t *testing.T) { + var n, wantSize int64 = 100000000, 100007653 // "Compressing" n random bytes results in this many bytes. + sl := NewLayer(ioutil.NopCloser(io.LimitReader(rand.Reader, n))) + rc, err := sl.Compressed() + if err != nil { + t.Fatalf("Uncompressed: %v", err) + } + if _, err := io.Copy(ioutil.Discard, rc); err != nil { + t.Fatalf("Reading layer: %v", err) + } + if err := rc.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + if dig, err := sl.Digest(); err != nil { + t.Errorf("Digest: %v", err) + } else if dig.String() == (v1.Hash{}).String() { + t.Errorf("Digest got %q, want anything else", (v1.Hash{}).String()) + } + if diffID, err := sl.DiffID(); err != nil { + t.Errorf("DiffID: %v", err) + } else if diffID.String() == (v1.Hash{}).String() { + t.Errorf("DiffID got %q, want anything else", (v1.Hash{}).String()) + } + if size, err := sl.Size(); err != nil { + t.Errorf("Size: %v", err) + } else if size != wantSize { + t.Errorf("Size got %d, want %d", size, n) + } +} + +func TestStreamableLayerFromTarball(t *testing.T) { + pr, pw := io.Pipe() + tw := tar.NewWriter(pw) + go func() { + // "Stream" a bunch of files into the layer. + pw.CloseWithError(func() error { + for i := 0; i < 1000; i++ { + name := fmt.Sprintf("file-%d.txt", i) + body := fmt.Sprintf("i am file number %d", i) + if err := tw.WriteHeader(&tar.Header{ + Name: name, + Mode: 0600, + Size: int64(len(body)), + Typeflag: tar.TypeReg, + }); err != nil { + return err + } + if _, err := tw.Write([]byte(body)); err != nil { + return err + } + } + return nil + }()) + }() + + l := NewLayer(pr) + rc, err := l.Compressed() + if err != nil { + t.Fatalf("Compressed: %v", err) + } + if _, err := io.Copy(ioutil.Discard, rc); err != nil { + t.Fatalf("Copy: %v", err) + } + if err := rc.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + wantDigest := "sha256:f53d6a164ab476294212843f267740bd12f79e00abd8050c24ce8a9bceaa36b0" + if got, err := l.Digest(); err != nil { + t.Errorf("Digest: %v", err) + } else if got.String() != wantDigest { + t.Errorf("Digest: got %q, want %q", got.String(), wantDigest) + } +} + +// TestNotComputed tests that Digest/DiffID/Size return ErrNotComputed before +// the stream has been consumed. +func TestNotComputed(t *testing.T) { + l := NewLayer(ioutil.NopCloser(bytes.NewBufferString("hi"))) + + // All methods should return ErrNotComputed until the stream has been + // consumed and closed. + if _, err := l.Size(); err != ErrNotComputed { + t.Errorf("Size: got %v, want %v", err, ErrNotComputed) + } + if _, err := l.Digest(); err == nil { + t.Errorf("Digest: got %v, want %v", err, ErrNotComputed) + } + if _, err := l.DiffID(); err == nil { + t.Errorf("DiffID: got %v, want %v", err, ErrNotComputed) + } +} + +// TestConsumed tests that Compressed returns ErrConsumed when the stream has +// already been consumed. +func TestConsumed(t *testing.T) { + l := NewLayer(ioutil.NopCloser(strings.NewReader("hello"))) + rc, err := l.Compressed() + if err != nil { + t.Errorf("Compressed: %v", err) + } + if _, err := io.Copy(ioutil.Discard, rc); err != nil { + t.Errorf("Error reading contents: %v", err) + } + if err := rc.Close(); err != nil { + t.Errorf("Close: %v", err) + } + + if _, err := l.Compressed(); err != ErrConsumed { + t.Errorf("Compressed() after consuming; got %v, want %v", err, ErrConsumed) + } +}