diff --git a/pkg/content/iowriter.go b/pkg/content/iowriter.go new file mode 100644 index 000000000..dad9dd8ca --- /dev/null +++ b/pkg/content/iowriter.go @@ -0,0 +1,82 @@ +package content + +import ( + "context" + "io" + "io/ioutil" + + "github.com/containerd/containerd/content" + "github.com/opencontainers/go-digest" +) + +// IoContentWriter writer that wraps an io.Writer, so the results can be streamed to +// an open io.Writer. For example, can be used to pull a layer and write it to a file, or device. +type IoContentWriter struct { + writer io.Writer + digester digest.Digester + size int64 +} + +// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy, +// in bytes, between the parent and child. The default, when 0, is to simply use +// whatever golang defaults to with io.Copy +func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer { + w := writer + if w == nil { + w = ioutil.Discard + } + ioc := &IoContentWriter{ + writer: w, + digester: digest.Canonical.Digester(), + } + return NewPassthroughWriter(ioc, func(r io.Reader, w io.Writer, done chan<- error) { + // write out the data to the io writer + var ( + err error + ) + if blocksize == 0 { + _, err = io.Copy(w, r) + } else { + b := make([]byte, blocksize, blocksize) + _, err = io.CopyBuffer(w, r, b) + } + done <- err + }) +} + +func (w *IoContentWriter) Write(p []byte) (n int, err error) { + n, err = w.writer.Write(p) + if err != nil { + return 0, err + } + w.digester.Hash().Write(p[:n]) + w.size += int64(n) + return +} + +func (w *IoContentWriter) Close() error { + return nil +} + +// Digest may return empty digest or panics until committed. +func (w *IoContentWriter) Digest() digest.Digest { + return w.digester.Digest() +} + +// Commit commits the blob (but no roll-back is guaranteed on an error). +// size and expected can be zero-value when unknown. +// Commit always closes the writer, even on error. +// ErrAlreadyExists aborts the writer. +func (w *IoContentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + return nil +} + +// Status returns the current state of write +func (w *IoContentWriter) Status() (content.Status, error) { + return content.Status{}, nil +} + +// Truncate updates the size of the target blob +func (w *IoContentWriter) Truncate(size int64) error { + return nil +} diff --git a/pkg/content/passthrough.go b/pkg/content/passthrough.go new file mode 100644 index 000000000..85c4873a7 --- /dev/null +++ b/pkg/content/passthrough.go @@ -0,0 +1,103 @@ +package content + +import ( + "context" + "io" + + "github.com/containerd/containerd/content" + "github.com/opencontainers/go-digest" +) + +// PassthroughWriter takes an input stream and passes it through to an underlying writer, +// while providing the ability to manipulate the stream before it gets passed through +type PassthroughWriter struct { + writer content.Writer + pipew *io.PipeWriter + digester digest.Digester + size int64 + underlyingDigester digest.Digester + underlyingSize int64 + reader *io.PipeReader + done chan error +} + +// NewPassthroughWriter creates a pass-through writer that allows for processing +// the content via an arbitrary function. The function should do whatever processing it +// wants, reading from the Reader to the Writer. When done, it must indicate via +// sending an error or nil to the Done +func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error)) content.Writer { + r, w := io.Pipe() + pw := &PassthroughWriter{ + writer: writer, + pipew: w, + digester: digest.Canonical.Digester(), + underlyingDigester: digest.Canonical.Digester(), + reader: r, + done: make(chan error, 1), + } + uw := &underlyingWriter{ + pw: pw, + } + go f(r, uw, pw.done) + return pw +} + +func (pw *PassthroughWriter) Write(p []byte) (n int, err error) { + n, err = pw.pipew.Write(p) + pw.digester.Hash().Write(p[:n]) + pw.size += int64(n) + return +} + +func (pw *PassthroughWriter) Close() error { + pw.pipew.Close() + pw.writer.Close() + return nil +} + +// Digest may return empty digest or panics until committed. +func (pw *PassthroughWriter) Digest() digest.Digest { + return pw.digester.Digest() +} + +// Commit commits the blob (but no roll-back is guaranteed on an error). +// size and expected can be zero-value when unknown. +// Commit always closes the writer, even on error. +// ErrAlreadyExists aborts the writer. +func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + pw.pipew.Close() + err := <-pw.done + pw.reader.Close() + if err != nil && err != io.EOF { + return err + } + return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...) +} + +// Status returns the current state of write +func (pw *PassthroughWriter) Status() (content.Status, error) { + return pw.writer.Status() +} + +// Truncate updates the size of the target blob +func (pw *PassthroughWriter) Truncate(size int64) error { + return pw.writer.Truncate(size) +} + +// underlyingWriter implementation of io.Writer to write to the underlying +// io.Writer +type underlyingWriter struct { + pw *PassthroughWriter +} + +// Write write to the underlying writer +func (u *underlyingWriter) Write(p []byte) (int, error) { + n, err := u.pw.writer.Write(p) + if err != nil { + return 0, err + } + + u.pw.underlyingSize += int64(len(p)) + u.pw.underlyingDigester.Hash().Write(p) + return n, nil +} diff --git a/pkg/content/passthrough_test.go b/pkg/content/passthrough_test.go new file mode 100644 index 000000000..87ba23604 --- /dev/null +++ b/pkg/content/passthrough_test.go @@ -0,0 +1,104 @@ +package content_test + +import ( + "context" + "fmt" + "io" + "testing" + + ctrcontent "github.com/containerd/containerd/content" + "github.com/deislabs/oras/pkg/content" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +var ( + testRef = "abc123" + testContent = []byte("Hello World!") + appendText = "1" + modifiedContent = fmt.Sprintf("%s%s", testContent, appendText) + testDescriptor = ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageConfig, + Digest: digest.FromBytes(testContent), + Size: int64(len(testContent)), + Annotations: map[string]string{ + ocispec.AnnotationTitle: testRef, + }, + } + modifiedDescriptor = ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageConfig, + Digest: digest.FromBytes([]byte(modifiedContent)), + Size: int64(len(modifiedContent)), + Annotations: map[string]string{ + ocispec.AnnotationTitle: testRef, + }, + } +) + +func TestPassthroughWriter(t *testing.T) { + // simple pass through function that modifies the data just slightly + f := func(r io.Reader, w io.Writer, done chan<- error) { + var ( + err error + n int + ) + for { + b := make([]byte, 1024) + n, err = r.Read(b) + if err != nil && err != io.EOF { + t.Fatalf("data read error: %v", err) + break + } + l := n + if n > len(b) { + l = len(b) + } + + // we change it just slightly + b = b[:l] + if l > 0 { + b = append(b, 0x31) + } + if _, err := w.Write(b); err != nil { + t.Fatalf("error writing to underlying writer: %v", err) + break + } + if err == io.EOF { + break + } + } + done <- err + } + ctx := context.Background() + mem := content.NewMemoryStore() + memw, err := mem.Writer(ctx, ctrcontent.WithDescriptor(modifiedDescriptor)) + if err != nil { + t.Fatalf("unexpected error getting the memory store writer: %v", err) + } + writer := content.NewPassthroughWriter(memw, f) + n, err := writer.Write(testContent) + if err != nil { + t.Fatalf("unexpected error on Write: %v", err) + } + if n != len(testContent) { + t.Fatalf("wrote %d bytes instead of %d", n, len(testContent)) + } + if err := writer.Commit(ctx, testDescriptor.Size, testDescriptor.Digest); err != nil { + t.Errorf("unexpected error on Commit: %v", err) + } + if digest := writer.Digest(); digest != testDescriptor.Digest { + t.Errorf("mismatched digest: actual %v, expected %v", digest, testDescriptor.Digest) + } + + // make sure the data is what we expected + _, b, found := mem.Get(modifiedDescriptor) + if !found { + t.Fatalf("target descriptor not found in underlying memory store") + } + if len(b) != len(modifiedContent) { + t.Errorf("unexpectedly got %d bytes instead of expected %d", len(b), len(modifiedContent)) + } + if string(b) != modifiedContent { + t.Errorf("mismatched content, expected '%s', got '%s'", modifiedContent, string(b)) + } +}