Skip to content

Commit

Permalink
create pass-through writer and iowriter
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Deitcher <avi@deitcher.net>
  • Loading branch information
deitch committed Oct 24, 2020
1 parent 6414398 commit 3984d94
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 0 deletions.
82 changes: 82 additions & 0 deletions pkg/content/iowriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package content

import (
"context"
"io"
"io/ioutil"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
)

// IoContentWriter writer that wraps an io.Writer, so the results can be streamed to
// an open io.Writer. For example, can be used to pull a layer and write it to a file, or device.
type IoContentWriter struct {
writer io.Writer
digester digest.Digester
size int64
}

// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy,
// in bytes, between the parent and child. The default, when 0, is to simply use
// whatever golang defaults to with io.Copy
func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer {
w := writer
if w == nil {
w = ioutil.Discard
}
ioc := &IoContentWriter{
writer: w,
digester: digest.Canonical.Digester(),
}
return NewPassthroughWriter(ioc, func(r io.Reader, w io.Writer, done chan<- error) {
// write out the data to the io writer
var (
err error
)
if blocksize == 0 {
_, err = io.Copy(w, r)
} else {
b := make([]byte, blocksize, blocksize)
_, err = io.CopyBuffer(w, r, b)
}
done <- err
})
}

func (w *IoContentWriter) Write(p []byte) (n int, err error) {
n, err = w.writer.Write(p)
if err != nil {
return 0, err
}
w.digester.Hash().Write(p[:n])
w.size += int64(n)
return
}

func (w *IoContentWriter) Close() error {
return nil
}

// Digest may return empty digest or panics until committed.
func (w *IoContentWriter) Digest() digest.Digest {
return w.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (w *IoContentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
return nil
}

// Status returns the current state of write
func (w *IoContentWriter) Status() (content.Status, error) {
return content.Status{}, nil
}

// Truncate updates the size of the target blob
func (w *IoContentWriter) Truncate(size int64) error {
return nil
}
103 changes: 103 additions & 0 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package content

import (
"context"
"io"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
)

// PassthroughWriter takes an input stream and passes it through to an underlying writer,
// while providing the ability to manipulate the stream before it gets passed through
type PassthroughWriter struct {
writer content.Writer
pipew *io.PipeWriter
digester digest.Digester
size int64
underlyingDigester digest.Digester
underlyingSize int64
reader *io.PipeReader
done chan error
}

// NewPassthroughWriter creates a pass-through writer that allows for processing
// the content via an arbitrary function. The function should do whatever processing it
// wants, reading from the Reader to the Writer. When done, it must indicate via
// sending an error or nil to the Done
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error)) content.Writer {
r, w := io.Pipe()
pw := &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingDigester: digest.Canonical.Digester(),
reader: r,
done: make(chan error, 1),
}
uw := &underlyingWriter{
pw: pw,
}
go f(r, uw, pw.done)
return pw
}

func (pw *PassthroughWriter) Write(p []byte) (n int, err error) {
n, err = pw.pipew.Write(p)
pw.digester.Hash().Write(p[:n])
pw.size += int64(n)
return
}

func (pw *PassthroughWriter) Close() error {
pw.pipew.Close()
pw.writer.Close()
return nil
}

// Digest may return empty digest or panics until committed.
func (pw *PassthroughWriter) Digest() digest.Digest {
return pw.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
pw.pipew.Close()
err := <-pw.done
pw.reader.Close()
if err != nil && err != io.EOF {
return err
}
return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...)
}

// Status returns the current state of write
func (pw *PassthroughWriter) Status() (content.Status, error) {
return pw.writer.Status()
}

// Truncate updates the size of the target blob
func (pw *PassthroughWriter) Truncate(size int64) error {
return pw.writer.Truncate(size)
}

// underlyingWriter implementation of io.Writer to write to the underlying
// io.Writer
type underlyingWriter struct {
pw *PassthroughWriter
}

// Write write to the underlying writer
func (u *underlyingWriter) Write(p []byte) (int, error) {
n, err := u.pw.writer.Write(p)
if err != nil {
return 0, err
}

u.pw.underlyingSize += int64(len(p))
u.pw.underlyingDigester.Hash().Write(p)
return n, nil
}
104 changes: 104 additions & 0 deletions pkg/content/passthrough_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package content_test

import (
"context"
"fmt"
"io"
"testing"

ctrcontent "github.com/containerd/containerd/content"
"github.com/deislabs/oras/pkg/content"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

var (
testRef = "abc123"
testContent = []byte("Hello World!")
appendText = "1"
modifiedContent = fmt.Sprintf("%s%s", testContent, appendText)
testDescriptor = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes(testContent),
Size: int64(len(testContent)),
Annotations: map[string]string{
ocispec.AnnotationTitle: testRef,
},
}
modifiedDescriptor = ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes([]byte(modifiedContent)),
Size: int64(len(modifiedContent)),
Annotations: map[string]string{
ocispec.AnnotationTitle: testRef,
},
}
)

func TestPassthroughWriter(t *testing.T) {
// simple pass through function that modifies the data just slightly
f := func(r io.Reader, w io.Writer, done chan<- error) {
var (
err error
n int
)
for {
b := make([]byte, 1024)
n, err = r.Read(b)
if err != nil && err != io.EOF {
t.Fatalf("data read error: %v", err)
break
}
l := n
if n > len(b) {
l = len(b)
}

// we change it just slightly
b = b[:l]
if l > 0 {
b = append(b, 0x31)
}
if _, err := w.Write(b); err != nil {
t.Fatalf("error writing to underlying writer: %v", err)
break
}
if err == io.EOF {
break
}
}
done <- err
}
ctx := context.Background()
mem := content.NewMemoryStore()
memw, err := mem.Writer(ctx, ctrcontent.WithDescriptor(modifiedDescriptor))
if err != nil {
t.Fatalf("unexpected error getting the memory store writer: %v", err)
}
writer := content.NewPassthroughWriter(memw, f)
n, err := writer.Write(testContent)
if err != nil {
t.Fatalf("unexpected error on Write: %v", err)
}
if n != len(testContent) {
t.Fatalf("wrote %d bytes instead of %d", n, len(testContent))
}
if err := writer.Commit(ctx, testDescriptor.Size, testDescriptor.Digest); err != nil {
t.Errorf("unexpected error on Commit: %v", err)
}
if digest := writer.Digest(); digest != testDescriptor.Digest {
t.Errorf("mismatched digest: actual %v, expected %v", digest, testDescriptor.Digest)
}

// make sure the data is what we expected
_, b, found := mem.Get(modifiedDescriptor)
if !found {
t.Fatalf("target descriptor not found in underlying memory store")
}
if len(b) != len(modifiedContent) {
t.Errorf("unexpectedly got %d bytes instead of expected %d", len(b), len(modifiedContent))
}
if string(b) != modifiedContent {
t.Errorf("mismatched content, expected '%s', got '%s'", modifiedContent, string(b))
}
}

0 comments on commit 3984d94

Please sign in to comment.