Skip to content

Commit

Permalink
create pass-through writer and iowriter
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Deitcher <avi@deitcher.net>
  • Loading branch information
deitch committed Oct 1, 2020
1 parent 78a9b6b commit bfb413b
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 0 deletions.
110 changes: 110 additions & 0 deletions pkg/content/iowriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package content

import (
"context"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
)

const (
// DefaultBlocksize size of byte slice for each io copy
DefaultBlocksize = 20480
)

// IoContentWriter writer that wraps an io.Writer, so the results can be streamed to
// an open io.Writer. For example, can be used to pull a layer and write it to a file, or device.
type IoContentWriter struct {
writer io.Writer
digester digest.Digester
size int64
blocksize int
}

// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy,
// in bytes, between the parent and child. The default, when 0, is DefaultBlocksize.
func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer {
if blocksize == 0 {
blocksize = DefaultBlocksize
}
ioc := &IoContentWriter{
writer: writer,
digester: digest.Canonical.Digester(),
blocksize: blocksize,
}
return NewPassthroughWriter(ioc, func(pw *PassthroughWriter) {
// write out the uncompressed data
var (
err error
n int
)
for {
b := make([]byte, ioc.blocksize, ioc.blocksize)
n, err = pw.Reader.Read(b)
if err != nil && err != io.EOF {
err = fmt.Errorf("WriterWrapper: data read error: %v", err)
break
}
l := n
if n > len(b) {
l = len(b)
}

if err = pw.UnderlyingWrite(b[:l]); err != nil {
err = fmt.Errorf("WriterWrapper: error writing to underlying writer: %v", err)
break
}
if err == io.EOF {
break
}
}
pw.Done <- err
})
}

func (w *IoContentWriter) Write(p []byte) (n int, err error) {
var (
l int
)
if w.writer != nil {
l, err = w.writer.Write(p)
if err != nil {
return 0, err
}
} else {
l = len(p)
// nothing to write
}
w.digester.Hash().Write(p[:l])
w.size += int64(l)
return
}

func (w *IoContentWriter) Close() error {
return nil
}

// Digest may return empty digest or panics until committed.
func (w *IoContentWriter) Digest() digest.Digest {
return w.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (w *IoContentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
return nil
}

// Status returns the current state of write
func (w *IoContentWriter) Status() (content.Status, error) {
return content.Status{}, nil
}

// Truncate updates the size of the target blob
func (w *IoContentWriter) Truncate(size int64) error {
return nil
}
91 changes: 91 additions & 0 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package content

import (
"context"
"io"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
)

// PassthroughWriter takes an input stream and passes it through to an underlying writer,
// while providing the ability to manipulate the stream before it gets passed through
type PassthroughWriter struct {
writer content.Writer
pipew *io.PipeWriter
digester digest.Digester
size int64
underlyingDigester digest.Digester
underlyingSize int64
// Reader reader that the go routine should read from to get data to process
Reader *io.PipeReader
// Done channel for go routine to indicate when it is done, and pass an error. If it does not,
// it might block forever
Done chan error
}

func NewPassthroughWriter(writer content.Writer, f func(pw *PassthroughWriter)) content.Writer {
r, w := io.Pipe()
pw := &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingDigester: digest.Canonical.Digester(),
Reader: r,
Done: make(chan error, 1),
}
go f(pw)
return pw
}

func (pw *PassthroughWriter) Write(p []byte) (n int, err error) {
n, err = pw.pipew.Write(p)
pw.digester.Hash().Write(p[:n])
pw.size += int64(n)
return
}

func (pw *PassthroughWriter) Close() error {
pw.pipew.Close()
pw.writer.Close()
return nil
}

// Digest may return empty digest or panics until committed.
func (pw *PassthroughWriter) Digest() digest.Digest {
return pw.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
pw.pipew.Close()
err := <-pw.Done
pw.Reader.Close()
if err != nil {
return err
}
return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...)
}

// Status returns the current state of write
func (pw *PassthroughWriter) Status() (content.Status, error) {
return pw.writer.Status()
}

// Truncate updates the size of the target blob
func (pw *PassthroughWriter) Truncate(size int64) error {
return pw.writer.Truncate(size)
}

// UnderlyingWrite write to the underlying writer
func (pw *PassthroughWriter) UnderlyingWrite(p []byte) error {
if _, err := pw.writer.Write(p); err != nil {
return err
}
pw.underlyingSize += int64(len(p))
pw.underlyingDigester.Hash().Write(p)
return nil
}

0 comments on commit bfb413b

Please sign in to comment.