Skip to content

Commit

Permalink
pass through writer and io writer
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Deitcher <avi@deitcher.net>
  • Loading branch information
jdolitsky authored and deitch committed Oct 23, 2020
2 parents bfb413b + 6414398 commit 9fe43b5
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 53 deletions.
23 changes: 23 additions & 0 deletions implementors.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ See [OCI Artifacts][artifacts] for how to add OCI Artifacts support to your regi
- [docker/distribution](#docker-distribution) - local/offline verification
- [Azure Container Registry](#azure-container-registry-acr)
- [Amazon Elastic Container Registry](#amazon-elastic-container-registry-ecr)
- [Google Artifact Registry](#google-artifact-registry-gar)

## Artifact Types Using ORAS

Expand Down Expand Up @@ -234,6 +235,28 @@ ACR Artifact Documentation: [aka.ms/acr/artifacts](https://aka.ms/acr/artifacts)
--media-type application/vnd.unknown.layer.v1+txt
```

### [Google Artifact Registry (GAR)](https://cloud.google.com/artifact-registry)

- Authenticating with GAR using the gcloud command-line tool

```sh
gcloud auth configure-docker ${REGION}-docker.pkg.dev
```

- Pushing Artifacts to GAR

```sh
oras push ${REGION}-docker.pkg.dev/${GCP_PROJECT}/samples/artifact:1.0 \
./artifact.txt:application/vnd.unknown.layer.v1+txt
```

- Pulling Artifacts from GAR

```sh
oras pull ${REGION}-docker.pkg.dev/${GCP_PROJECT}/samples/artifact:1.0 \
--media-type application/vnd.unknown.layer.v1+txt
```

## Adding Your Registry or Artifact Type

Do you support [OCI Artifacts][artifacts] and would like your registry and/or project listed here? Please [submit a PR](https://github.com/deislabs/oras/pulls), using similar formatting above. We're happy to promote all usage, as well as feedback.
Expand Down
45 changes: 10 additions & 35 deletions pkg/content/iowriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,40 @@ package content

import (
"context"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
)

const (
// DefaultBlocksize size of byte slice for each io copy
DefaultBlocksize = 20480
)

// IoContentWriter writer that wraps an io.Writer, so the results can be streamed to
// an open io.Writer. For example, can be used to pull a layer and write it to a file, or device.
type IoContentWriter struct {
writer io.Writer
digester digest.Digester
size int64
blocksize int
}

// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy,
// in bytes, between the parent and child. The default, when 0, is DefaultBlocksize.
// in bytes, between the parent and child. The default, when 0, is to simply use
// whatever golang defaults to with io.Copy
func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer {
if blocksize == 0 {
blocksize = DefaultBlocksize
}
ioc := &IoContentWriter{
writer: writer,
digester: digest.Canonical.Digester(),
blocksize: blocksize,
}
return NewPassthroughWriter(ioc, func(pw *PassthroughWriter) {
// write out the uncompressed data
return NewPassthroughWriter(ioc, func(r io.Reader, w io.Writer, done chan<- error ) {
// write out the data to the io writer
var (
err error
n int
)
for {
b := make([]byte, ioc.blocksize, ioc.blocksize)
n, err = pw.Reader.Read(b)
if err != nil && err != io.EOF {
err = fmt.Errorf("WriterWrapper: data read error: %v", err)
break
}
l := n
if n > len(b) {
l = len(b)
}

if err = pw.UnderlyingWrite(b[:l]); err != nil {
err = fmt.Errorf("WriterWrapper: error writing to underlying writer: %v", err)
break
}
if err == io.EOF {
break
}
if blocksize == 0 {
_, err = io.Copy(w, r)
} else {
b := make([]byte, blocksize, blocksize)
_, err = io.CopyBuffer(w, r, b)
}
pw.Done <- err
done <- err
})
}

Expand Down
46 changes: 28 additions & 18 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@ type PassthroughWriter struct {
size int64
underlyingDigester digest.Digester
underlyingSize int64
// Reader reader that the go routine should read from to get data to process
Reader *io.PipeReader
// Done channel for go routine to indicate when it is done, and pass an error. If it does not,
// it might block forever
Done chan error
reader *io.PipeReader
done chan error
}

func NewPassthroughWriter(writer content.Writer, f func(pw *PassthroughWriter)) content.Writer {
// NewPassthroughWriter creates a pass-through writer that allows for processing
// the content via an arbitrary function. The function should do whatever processing it
// wants, reading from the Reader to the Writer. When done, it must indicate via
// sending an error or nil to the Done
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error)) content.Writer {
r, w := io.Pipe()
pw := &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingDigester: digest.Canonical.Digester(),
Reader: r,
Done: make(chan error, 1),
reader: r,
done: make(chan error, 1),
}
go f(pw)
uw := &underlyingWriter{
pw: pw,
}
go f(r, uw, pw.done)
return pw
}

Expand Down Expand Up @@ -62,9 +66,9 @@ func (pw *PassthroughWriter) Digest() digest.Digest {
// ErrAlreadyExists aborts the writer.
func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
pw.pipew.Close()
err := <-pw.Done
pw.Reader.Close()
if err != nil {
err := <-pw.done
pw.reader.Close()
if err != nil && err != io.EOF {
return err
}
return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...)
Expand All @@ -80,12 +84,18 @@ func (pw *PassthroughWriter) Truncate(size int64) error {
return pw.writer.Truncate(size)
}

// underlyingWriter implementation of io.Writer to write to the underlying
// io.Writer
type underlyingWriter struct {
pw *PassthroughWriter
}
// UnderlyingWrite write to the underlying writer
func (pw *PassthroughWriter) UnderlyingWrite(p []byte) error {
if _, err := pw.writer.Write(p); err != nil {
return err
func (u *underlyingWriter) Write(p []byte) (int, error) {
n, err := u.pw.writer.Write(p)
if err != nil {
return 0, err
}
pw.underlyingSize += int64(len(p))
pw.underlyingDigester.Hash().Write(p)
return nil
u.pw.underlyingSize += int64(len(p))
u.pw.underlyingDigester.Hash().Write(p)
return n, nil
}
104 changes: 104 additions & 0 deletions pkg/content/passthrough_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package content_test

import (
"context"
"fmt"
"io"
"testing"

ctrcontent "github.com/containerd/containerd/content"
"github.com/deislabs/oras/pkg/content"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

var (
testRef = "abc123"
testContent = []byte("Hello World!")
appendText = "1"
modifiedContent = fmt.Sprintf("%s%s", testContent, appendText)
testDescriptor = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes(testContent),
Size: int64(len(testContent)),
Annotations: map[string]string{
ocispec.AnnotationTitle: testRef,
},
}
modifiedDescriptor = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes([]byte(modifiedContent)),
Size: int64(len(modifiedContent)),
Annotations: map[string]string{
ocispec.AnnotationTitle: testRef,
},
}
)

func TestPassthroughWriter(t *testing.T) {
// simple pass through function that modifies the data just slightly
f := func(r io.Reader, w io.Writer, done chan<- error) {
var (
err error
n int
)
for {
b := make([]byte, 1024)
n, err = r.Read(b)
if err != nil && err != io.EOF {
t.Fatalf("data read error: %v", err)
break
}
l := n
if n > len(b) {
l = len(b)
}

// we change it just slightly
b = b[:l]
if l > 0 {
b = append(b, 0x31)
}
if _, err := w.Write(b); err != nil {
t.Fatalf("error writing to underlying writer: %v", err)
break
}
if err == io.EOF {
break
}
}
done <- err
}
ctx := context.Background()
mem := content.NewMemoryStore()
memw, err := mem.Writer(ctx, ctrcontent.WithDescriptor(modifiedDescriptor))
if err != nil {
t.Fatalf("unexpected error getting the memory store writer: %v", err)
}
writer := content.NewPassthroughWriter(memw, f)
n, err := writer.Write(testContent)
if err != nil {
t.Fatalf("unexpected error on Write: %v", err)
}
if n != len(testContent) {
t.Fatalf("wrote %d bytes instead of %d", n, len(testContent))
}
if err := writer.Commit(ctx, testDescriptor.Size, testDescriptor.Digest); err != nil {
t.Errorf("unexpected error on Commit: %v", err)
}
if digest := writer.Digest(); digest != testDescriptor.Digest {
t.Errorf("mismatched digest: actual %v, expected %v", digest, testDescriptor.Digest)
}

// make sure the data is what we expected
_, b, found := mem.Get(modifiedDescriptor)
if !found {
t.Fatalf("target descriptor not found in underlying memory store")
}
if len(b) != len(modifiedContent) {
t.Errorf("unexpectedly got %d bytes instead of expected %d", len(b), len(modifiedContent))
}
if string(b) != modifiedContent {
t.Errorf("mismatched content, expected '%s', got '%s'", modifiedContent, string(b))
}
}

0 comments on commit 9fe43b5

Please sign in to comment.