Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

options for content.Writers; options to pre-provide input and output hashing, and blocksize #192

Merged
merged 1 commit into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/content/consts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package content

import ocispec "github.com/opencontainers/image-spec/specs-go/v1"
import (
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

const (
// DefaultBlobMediaType specifies the default blob media type
Expand Down Expand Up @@ -32,3 +35,8 @@ const (
// Simply uses the same size as io.Copy()
DefaultBlocksize = 32768
)

const (
// what you get for a blank digest
BlankHash = digest.Digest("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
)
19 changes: 14 additions & 5 deletions pkg/content/gunzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@ import (
)

// NewGunzipWriter wrap a writer with a gunzip, so that the stream is gunzipped
func NewGunzipWriter(writer content.Writer, blocksize int) content.Writer {
if blocksize == 0 {
blocksize = DefaultBlocksize
//
// By default, it calculates the hash when writing. If the option `skipHash` is true,
// it will skip doing the hash. Skipping the hash is intended to be used only
// if you are confident about the validity of the data being passed to the writer,
// and wish to save on the hashing time.
func NewGunzipWriter(writer content.Writer, opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}
return NewPassthroughWriter(writer, func(r io.Reader, w io.Writer, done chan<- error) {
gr, err := gzip.NewReader(r)
Expand All @@ -20,7 +29,7 @@ func NewGunzipWriter(writer content.Writer, blocksize int) content.Writer {
return
}
// write out the uncompressed data
b := make([]byte, blocksize, blocksize)
b := make([]byte, wOpts.Blocksize, wOpts.Blocksize)
for {
var n int
n, err = gr.Read(b)
Expand All @@ -44,5 +53,5 @@ func NewGunzipWriter(writer content.Writer, blocksize int) content.Writer {
}
gr.Close()
done <- err
})
}, opts...)
}
39 changes: 27 additions & 12 deletions pkg/content/iowriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,57 @@ type IoContentWriter struct {
writer io.Writer
digester digest.Digester
size int64
hash *digest.Digest
}

// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy,
// in bytes, between the parent and child. The default, when 0, is to simply use
// whatever golang defaults to with io.Copy
func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer {
// NewIoContentWriter create a new IoContentWriter.
//
// By default, it calculates the hash when writing. If the option `skipHash` is true,
// it will skip doing the hash. Skipping the hash is intended to be used only
// if you are confident about the validity of the data being passed to the writer,
// and wish to save on the hashing time.
func NewIoContentWriter(writer io.Writer, opts ...WriterOpt) content.Writer {
w := writer
if w == nil {
w = ioutil.Discard
}
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}
ioc := &IoContentWriter{
writer: w,
digester: digest.Canonical.Digester(),
// we take the OutputHash, since the InputHash goes to the passthrough writer,
// which then passes the processed output to us
hash: wOpts.OutputHash,
}
return NewPassthroughWriter(ioc, func(r io.Reader, w io.Writer, done chan<- error) {
// write out the data to the io writer
var (
err error
)
if blocksize == 0 {
_, err = io.Copy(w, r)
} else {
b := make([]byte, blocksize, blocksize)
_, err = io.CopyBuffer(w, r, b)
}
// we could use io.Copy, but calling it with the default blocksize is identical to
// io.CopyBuffer. Otherwise, we would need some way to let the user flag "I want to use
// io.Copy", when it should not matter to them
b := make([]byte, wOpts.Blocksize, wOpts.Blocksize)
_, err = io.CopyBuffer(w, r, b)
done <- err
})
}, opts...)
}

func (w *IoContentWriter) Write(p []byte) (n int, err error) {
n, err = w.writer.Write(p)
if err != nil {
return 0, err
}
w.digester.Hash().Write(p[:n])
w.size += int64(n)
if w.hash == nil {
w.digester.Hash().Write(p[:n])
}
return
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/content/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package content

import (
"github.com/opencontainers/go-digest"
)

type WriterOpts struct {
InputHash *digest.Digest
OutputHash *digest.Digest
Blocksize int
}

type WriterOpt func(*WriterOpts) error

func DefaultWriterOpts() WriterOpts {
return WriterOpts{
InputHash: nil,
OutputHash: nil,
Blocksize: DefaultBlocksize,
}
}

// WithInputHash provide the expected input hash to a writer. Writers
// may suppress their own calculation of a hash on the stream, taking this
// hash instead. If the Writer processes the data before passing it on to another
// Writer layer, this is the hash of the *input* stream.
//
// To have a blank hash, use WithInputHash(BlankHash).
func WithInputHash(hash digest.Digest) WriterOpt {
return func(w *WriterOpts) error {
w.InputHash = &hash
return nil
}
}

// WithOutputHash provide the expected output hash to a writer. Writers
// may suppress their own calculation of a hash on the stream, taking this
// hash instead. If the Writer processes the data before passing it on to another
// Writer layer, this is the hash of the *output* stream.
//
// To have a blank hash, use WithInputHash(BlankHash).
func WithOutputHash(hash digest.Digest) WriterOpt {
return func(w *WriterOpts) error {
w.OutputHash = &hash
return nil
}
}

// WithBlocksize set the blocksize used by the processor of data.
// The default is DefaultBlocksize, which is the same as that used by io.Copy
func WithBlocksize(blocksize int) WriterOpt {
return func(w *WriterOpts) error {
w.Blocksize = blocksize
return nil
}
}
86 changes: 61 additions & 25 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,52 @@ import (
// PassthroughWriter takes an input stream and passes it through to an underlying writer,
// while providing the ability to manipulate the stream before it gets passed through
type PassthroughWriter struct {
writer content.Writer
pipew *io.PipeWriter
digester digest.Digester
size int64
underlyingDigester digest.Digester
underlyingSize int64
reader *io.PipeReader
done chan error
writer content.Writer
pipew *io.PipeWriter
digester digest.Digester
size int64
underlyingWriter *underlyingWriter
reader *io.PipeReader
hash *digest.Digest
done chan error
}

// NewPassthroughWriter creates a pass-through writer that allows for processing
// the content via an arbitrary function. The function should do whatever processing it
// wants, reading from the Reader to the Writer. When done, it must indicate via
// sending an error or nil to the Done
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error)) content.Writer {
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}

r, w := io.Pipe()
pw := &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingDigester: digest.Canonical.Digester(),
reader: r,
done: make(chan error, 1),
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingWriter: &underlyingWriter{
writer: writer,
digester: digest.Canonical.Digester(),
hash: wOpts.OutputHash,
},
reader: r,
hash: wOpts.InputHash,
done: make(chan error, 1),
}
uw := &underlyingWriter{
pw: pw,
}
go f(r, uw, pw.done)
go f(r, pw.underlyingWriter, pw.done)
return pw
}

func (pw *PassthroughWriter) Write(p []byte) (n int, err error) {
n, err = pw.pipew.Write(p)
pw.digester.Hash().Write(p[:n])
if pw.hash == nil {
pw.digester.Hash().Write(p[:n])
}
pw.size += int64(n)
return
}
Expand All @@ -57,6 +69,9 @@ func (pw *PassthroughWriter) Close() error {

// Digest may return empty digest or panics until committed.
func (pw *PassthroughWriter) Digest() digest.Digest {
if pw.hash != nil {
return *pw.hash
}
return pw.digester.Digest()
}

Expand All @@ -71,7 +86,10 @@ func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected di
if err != nil && err != io.EOF {
return err
}
return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...)

// Some underlying writers will validate an expected digest, so we need the option to pass it
// that digest. That is why we caluclate the digest of the underlying writer throughout the write process.
return pw.writer.Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...)
}

// Status returns the current state of write
Expand All @@ -87,17 +105,35 @@ func (pw *PassthroughWriter) Truncate(size int64) error {
// underlyingWriter implementation of io.Writer to write to the underlying
// io.Writer
type underlyingWriter struct {
pw *PassthroughWriter
writer content.Writer
digester digest.Digester
size int64
hash *digest.Digest
}

// Write write to the underlying writer
func (u *underlyingWriter) Write(p []byte) (int, error) {
n, err := u.pw.writer.Write(p)
n, err := u.writer.Write(p)
if err != nil {
return 0, err
}

u.pw.underlyingSize += int64(len(p))
u.pw.underlyingDigester.Hash().Write(p)
if u.hash == nil {
u.digester.Hash().Write(p)
}
u.size += int64(len(p))
return n, nil
}

// Size get total size written
func (u *underlyingWriter) Size() int64 {
return u.size
}

// Digest may return empty digest or panics until committed.
func (u *underlyingWriter) Digest() digest.Digest {
if u.hash != nil {
return *u.hash
}
return u.digester.Digest()
}
Loading