Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create pass-through writer and iowriter #182

Merged
merged 1 commit into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions pkg/content/iowriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package content

import (
"context"
"io"
"io/ioutil"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
)

// IoContentWriter writer that wraps an io.Writer, so the results can be streamed to
// an open io.Writer. For example, can be used to pull a layer and write it to a file, or device.
type IoContentWriter struct {
writer io.Writer
digester digest.Digester
size int64
}

// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy,
// in bytes, between the parent and child. The default, when 0, is to simply use
// whatever golang defaults to with io.Copy
func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer {
w := writer
if w == nil {
w = ioutil.Discard
}
ioc := &IoContentWriter{
writer: w,
digester: digest.Canonical.Digester(),
}
return NewPassthroughWriter(ioc, func(r io.Reader, w io.Writer, done chan<- error) {
// write out the data to the io writer
var (
err error
)
if blocksize == 0 {
_, err = io.Copy(w, r)
} else {
b := make([]byte, blocksize, blocksize)
_, err = io.CopyBuffer(w, r, b)
}
done <- err
})
}

func (w *IoContentWriter) Write(p []byte) (n int, err error) {
n, err = w.writer.Write(p)
if err != nil {
return 0, err
}
w.digester.Hash().Write(p[:n])
w.size += int64(n)
return
}

func (w *IoContentWriter) Close() error {
return nil
}

// Digest may return empty digest or panics until committed.
func (w *IoContentWriter) Digest() digest.Digest {
return w.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (w *IoContentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
return nil
}

// Status returns the current state of write
func (w *IoContentWriter) Status() (content.Status, error) {
return content.Status{}, nil
}

// Truncate updates the size of the target blob
func (w *IoContentWriter) Truncate(size int64) error {
return nil
}
103 changes: 103 additions & 0 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package content

import (
"context"
"io"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
)

// PassthroughWriter takes an input stream and passes it through to an underlying writer,
// while providing the ability to manipulate the stream before it gets passed through
type PassthroughWriter struct {
writer content.Writer
pipew *io.PipeWriter
digester digest.Digester
size int64
underlyingDigester digest.Digester
underlyingSize int64
reader *io.PipeReader
done chan error
}

// NewPassthroughWriter creates a pass-through writer that allows for processing
// the content via an arbitrary function. The function should do whatever processing it
// wants, reading from the Reader to the Writer. When done, it must indicate via
// sending an error or nil to the Done
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error)) content.Writer {
r, w := io.Pipe()
pw := &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingDigester: digest.Canonical.Digester(),
reader: r,
done: make(chan error, 1),
}
uw := &underlyingWriter{
pw: pw,
}
go f(r, uw, pw.done)
return pw
}

func (pw *PassthroughWriter) Write(p []byte) (n int, err error) {
n, err = pw.pipew.Write(p)
pw.digester.Hash().Write(p[:n])
pw.size += int64(n)
return
}

func (pw *PassthroughWriter) Close() error {
pw.pipew.Close()
pw.writer.Close()
return nil
}

// Digest may return empty digest or panics until committed.
func (pw *PassthroughWriter) Digest() digest.Digest {
return pw.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
pw.pipew.Close()
err := <-pw.done
pw.reader.Close()
if err != nil && err != io.EOF {
return err
}
return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...)
}

// Status returns the current state of write
func (pw *PassthroughWriter) Status() (content.Status, error) {
return pw.writer.Status()
}

// Truncate updates the size of the target blob
func (pw *PassthroughWriter) Truncate(size int64) error {
return pw.writer.Truncate(size)
}

// underlyingWriter implementation of io.Writer to write to the underlying
// io.Writer
type underlyingWriter struct {
pw *PassthroughWriter
}

// Write write to the underlying writer
func (u *underlyingWriter) Write(p []byte) (int, error) {
n, err := u.pw.writer.Write(p)
if err != nil {
return 0, err
}

u.pw.underlyingSize += int64(len(p))
u.pw.underlyingDigester.Hash().Write(p)
return n, nil
}
104 changes: 104 additions & 0 deletions pkg/content/passthrough_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package content_test

import (
"context"
"fmt"
"io"
"testing"

ctrcontent "github.com/containerd/containerd/content"
"github.com/deislabs/oras/pkg/content"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

var (
testRef = "abc123"
testContent = []byte("Hello World!")
appendText = "1"
modifiedContent = fmt.Sprintf("%s%s", testContent, appendText)
testDescriptor = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes(testContent),
Size: int64(len(testContent)),
Annotations: map[string]string{
ocispec.AnnotationTitle: testRef,
},
}
modifiedDescriptor = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes([]byte(modifiedContent)),
Size: int64(len(modifiedContent)),
Annotations: map[string]string{
ocispec.AnnotationTitle: testRef,
},
}
)

func TestPassthroughWriter(t *testing.T) {
// simple pass through function that modifies the data just slightly
f := func(r io.Reader, w io.Writer, done chan<- error) {
var (
err error
n int
)
for {
b := make([]byte, 1024)
n, err = r.Read(b)
if err != nil && err != io.EOF {
t.Fatalf("data read error: %v", err)
break
}
l := n
if n > len(b) {
l = len(b)
}

// we change it just slightly
b = b[:l]
if l > 0 {
b = append(b, 0x31)
}
if _, err := w.Write(b); err != nil {
t.Fatalf("error writing to underlying writer: %v", err)
break
}
if err == io.EOF {
break
}
}
done <- err
}
ctx := context.Background()
mem := content.NewMemoryStore()
memw, err := mem.Writer(ctx, ctrcontent.WithDescriptor(modifiedDescriptor))
if err != nil {
t.Fatalf("unexpected error getting the memory store writer: %v", err)
}
writer := content.NewPassthroughWriter(memw, f)
n, err := writer.Write(testContent)
if err != nil {
t.Fatalf("unexpected error on Write: %v", err)
}
if n != len(testContent) {
t.Fatalf("wrote %d bytes instead of %d", n, len(testContent))
}
if err := writer.Commit(ctx, testDescriptor.Size, testDescriptor.Digest); err != nil {
t.Errorf("unexpected error on Commit: %v", err)
}
if digest := writer.Digest(); digest != testDescriptor.Digest {
t.Errorf("mismatched digest: actual %v, expected %v", digest, testDescriptor.Digest)
}

// make sure the data is what we expected
_, b, found := mem.Get(modifiedDescriptor)
if !found {
t.Fatalf("target descriptor not found in underlying memory store")
}
if len(b) != len(modifiedContent) {
t.Errorf("unexpectedly got %d bytes instead of expected %d", len(b), len(modifiedContent))
}
if string(b) != modifiedContent {
t.Errorf("mismatched content, expected '%s', got '%s'", modifiedContent, string(b))
}
}