-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
create pass-through writer and iowriter
Signed-off-by: Avi Deitcher <avi@deitcher.net>
- Loading branch information
Showing
3 changed files
with
297 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package content | ||
|
||
import ( | ||
"context" | ||
"io" | ||
|
||
"github.com/containerd/containerd/content" | ||
"github.com/opencontainers/go-digest" | ||
) | ||
|
||
// IoContentWriter writer that wraps an io.Writer, so the results can be streamed to | ||
// an open io.Writer. For example, can be used to pull a layer and write it to a file, or device. | ||
type IoContentWriter struct { | ||
writer io.Writer | ||
digester digest.Digester | ||
size int64 | ||
} | ||
|
||
// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy, | ||
// in bytes, between the parent and child. The default, when 0, is to simply use | ||
// whatever golang defaults to with io.Copy | ||
func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer { | ||
ioc := &IoContentWriter{ | ||
writer: writer, | ||
digester: digest.Canonical.Digester(), | ||
} | ||
return NewPassthroughWriter(ioc, func(r io.Reader, w io.Writer, done chan<- error ) { | ||
// write out the data to the io writer | ||
var ( | ||
err error | ||
) | ||
// nil writer means ignore it | ||
if ioc.writer == nil { | ||
done <- err | ||
return | ||
} | ||
if blocksize == 0 { | ||
_, err = io.Copy(w, r) | ||
} else { | ||
b := make([]byte, blocksize, blocksize) | ||
_, err = io.CopyBuffer(w, r, b) | ||
} | ||
done <- err | ||
}) | ||
} | ||
|
||
func (w *IoContentWriter) Write(p []byte) (n int, err error) { | ||
var ( | ||
l int | ||
) | ||
if w.writer != nil { | ||
l, err = w.writer.Write(p) | ||
if err != nil { | ||
return 0, err | ||
} | ||
} else { | ||
l = len(p) | ||
// nothing to write | ||
} | ||
w.digester.Hash().Write(p[:l]) | ||
w.size += int64(l) | ||
return | ||
} | ||
|
||
func (w *IoContentWriter) Close() error { | ||
return nil | ||
} | ||
|
||
// Digest may return empty digest or panics until committed. | ||
func (w *IoContentWriter) Digest() digest.Digest { | ||
return w.digester.Digest() | ||
} | ||
|
||
// Commit commits the blob (but no roll-back is guaranteed on an error). | ||
// size and expected can be zero-value when unknown. | ||
// Commit always closes the writer, even on error. | ||
// ErrAlreadyExists aborts the writer. | ||
func (w *IoContentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { | ||
return nil | ||
} | ||
|
||
// Status returns the current state of write | ||
func (w *IoContentWriter) Status() (content.Status, error) { | ||
return content.Status{}, nil | ||
} | ||
|
||
// Truncate updates the size of the target blob | ||
func (w *IoContentWriter) Truncate(size int64) error { | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package content | ||
|
||
import ( | ||
"context" | ||
"io" | ||
|
||
"github.com/containerd/containerd/content" | ||
"github.com/opencontainers/go-digest" | ||
) | ||
|
||
// PassthroughWriter takes an input stream and passes it through to an underlying writer, | ||
// while providing the ability to manipulate the stream before it gets passed through | ||
type PassthroughWriter struct { | ||
writer content.Writer | ||
pipew *io.PipeWriter | ||
digester digest.Digester | ||
size int64 | ||
underlyingDigester digest.Digester | ||
underlyingSize int64 | ||
reader *io.PipeReader | ||
done chan error | ||
} | ||
|
||
// NewPassthroughWriter creates a pass-through writer that allows for processing | ||
// the content via an arbitrary function. The function should do whatever processing it | ||
// wants, reading from the Reader to the Writer. When done, it must indicate via | ||
// sending an error or nil to the Done | ||
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error)) content.Writer { | ||
r, w := io.Pipe() | ||
pw := &PassthroughWriter{ | ||
writer: writer, | ||
pipew: w, | ||
digester: digest.Canonical.Digester(), | ||
underlyingDigester: digest.Canonical.Digester(), | ||
reader: r, | ||
done: make(chan error, 1), | ||
} | ||
uw := &underlyingWriter{ | ||
pw: pw, | ||
} | ||
go f(r, uw, pw.done) | ||
return pw | ||
} | ||
|
||
func (pw *PassthroughWriter) Write(p []byte) (n int, err error) { | ||
n, err = pw.pipew.Write(p) | ||
pw.digester.Hash().Write(p[:n]) | ||
pw.size += int64(n) | ||
return | ||
} | ||
|
||
func (pw *PassthroughWriter) Close() error { | ||
pw.pipew.Close() | ||
pw.writer.Close() | ||
return nil | ||
} | ||
|
||
// Digest may return empty digest or panics until committed. | ||
func (pw *PassthroughWriter) Digest() digest.Digest { | ||
return pw.digester.Digest() | ||
} | ||
|
||
// Commit commits the blob (but no roll-back is guaranteed on an error). | ||
// size and expected can be zero-value when unknown. | ||
// Commit always closes the writer, even on error. | ||
// ErrAlreadyExists aborts the writer. | ||
func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { | ||
pw.pipew.Close() | ||
err := <-pw.done | ||
pw.reader.Close() | ||
if err != nil && err != io.EOF { | ||
return err | ||
} | ||
return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...) | ||
} | ||
|
||
// Status returns the current state of write | ||
func (pw *PassthroughWriter) Status() (content.Status, error) { | ||
return pw.writer.Status() | ||
} | ||
|
||
// Truncate updates the size of the target blob | ||
func (pw *PassthroughWriter) Truncate(size int64) error { | ||
return pw.writer.Truncate(size) | ||
} | ||
|
||
// underlyingWriter implementation of io.Writer to write to the underlying | ||
// io.Writer | ||
type underlyingWriter struct { | ||
pw *PassthroughWriter | ||
} | ||
|
||
// Write write to the underlying writer | ||
func (u *underlyingWriter) Write(p []byte) (int, error) { | ||
n, err := u.pw.writer.Write(p) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
u.pw.underlyingSize += int64(len(p)) | ||
u.pw.underlyingDigester.Hash().Write(p) | ||
return n, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package content_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"testing" | ||
|
||
ctrcontent "github.com/containerd/containerd/content" | ||
"github.com/deislabs/oras/pkg/content" | ||
digest "github.com/opencontainers/go-digest" | ||
ocispec "github.com/opencontainers/image-spec/specs-go/v1" | ||
) | ||
|
||
var ( | ||
testRef = "abc123" | ||
testContent = []byte("Hello World!") | ||
appendText = "1" | ||
modifiedContent = fmt.Sprintf("%s%s", testContent, appendText) | ||
testDescriptor = ocispec.Descriptor{ | ||
MediaType: ocispec.MediaTypeImageConfig, | ||
Digest: digest.FromBytes(testContent), | ||
Size: int64(len(testContent)), | ||
Annotations: map[string]string{ | ||
ocispec.AnnotationTitle: testRef, | ||
}, | ||
} | ||
modifiedDescriptor = ocispec.Descriptor{ | ||
MediaType: ocispec.MediaTypeImageConfig, | ||
Digest: digest.FromBytes([]byte(modifiedContent)), | ||
Size: int64(len(modifiedContent)), | ||
Annotations: map[string]string{ | ||
ocispec.AnnotationTitle: testRef, | ||
}, | ||
} | ||
) | ||
|
||
func TestPassthroughWriter(t *testing.T) { | ||
// simple pass through function that modifies the data just slightly | ||
f := func(r io.Reader, w io.Writer, done chan<- error) { | ||
var ( | ||
err error | ||
n int | ||
) | ||
for { | ||
b := make([]byte, 1024) | ||
n, err = r.Read(b) | ||
if err != nil && err != io.EOF { | ||
t.Fatalf("data read error: %v", err) | ||
break | ||
} | ||
l := n | ||
if n > len(b) { | ||
l = len(b) | ||
} | ||
|
||
// we change it just slightly | ||
b = b[:l] | ||
if l > 0 { | ||
b = append(b, 0x31) | ||
} | ||
if _, err := w.Write(b); err != nil { | ||
t.Fatalf("error writing to underlying writer: %v", err) | ||
break | ||
} | ||
if err == io.EOF { | ||
break | ||
} | ||
} | ||
done <- err | ||
} | ||
ctx := context.Background() | ||
mem := content.NewMemoryStore() | ||
memw, err := mem.Writer(ctx, ctrcontent.WithDescriptor(modifiedDescriptor)) | ||
if err != nil { | ||
t.Fatalf("unexpected error getting the memory store writer: %v", err) | ||
} | ||
writer := content.NewPassthroughWriter(memw, f) | ||
n, err := writer.Write(testContent) | ||
if err != nil { | ||
t.Fatalf("unexpected error on Write: %v", err) | ||
} | ||
if n != len(testContent) { | ||
t.Fatalf("wrote %d bytes instead of %d", n, len(testContent)) | ||
} | ||
if err := writer.Commit(ctx, testDescriptor.Size, testDescriptor.Digest); err != nil { | ||
t.Errorf("unexpected error on Commit: %v", err) | ||
} | ||
if digest := writer.Digest(); digest != testDescriptor.Digest { | ||
t.Errorf("mismatched digest: actual %v, expected %v", digest, testDescriptor.Digest) | ||
} | ||
|
||
// make sure the data is what we expected | ||
_, b, found := mem.Get(modifiedDescriptor) | ||
if !found { | ||
t.Fatalf("target descriptor not found in underlying memory store") | ||
} | ||
if len(b) != len(modifiedContent) { | ||
t.Errorf("unexpectedly got %d bytes instead of expected %d", len(b), len(modifiedContent)) | ||
} | ||
if string(b) != modifiedContent { | ||
t.Errorf("mismatched content, expected '%s', got '%s'", modifiedContent, string(b)) | ||
} | ||
} |