Skip to content

Commit

Permalink
Merge pull request containerd#559 from stevvooe/content-service-cleanup
Browse files Browse the repository at this point in the history
content: cleanup service and interfaces
  • Loading branch information
crosbymichael authored Feb 22, 2017
2 parents 0a5544d + c062a85 commit 4bce28a
Show file tree
Hide file tree
Showing 15 changed files with 534 additions and 303 deletions.
439 changes: 255 additions & 184 deletions api/services/content/content.pb.go

Large diffs are not rendered by default.

58 changes: 37 additions & 21 deletions api/services/content/content.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,26 +133,27 @@ message WriteRequest {
// Ref identifies the pre-commit object to write to.
string ref = 2;

// ExpectedSize can be set to have the service validate the total size of
// the of committed content.
// Total can be set to have the service validate the total size of the
// committed content.
//
// The latest value before or with the commit action message will be use to
// validate the content. It is only required on one message for the write.
// validate the content. If the offset overflows total, the service may
// report an error. It is only required on one message for the write.
//
// If the value is zero or less, no validation of the final content will be
// performed.
int64 expected_size = 3;
int64 total = 3;

// ExpectedDigest can be set to have the service validate the final content
// against the provided digest.
// Expected can be set to have the service validate the final content against
// the provided digest.
//
// If the digest is already present in the object store, an AlreadyPresent
// If the digest is already present in the object store, an AlreadyExists
// error will be returned.
//
// Only the latest version will be used to check the content against the
// digest. It is only required to include it on a single message, before or
// with the commit action message.
string expected_digest = 4 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
string expected = 4 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];

// Offset specifies the number of bytes from the start at which to begin
// the write. If zero or less, the write will be from the start. This uses
Expand All @@ -172,20 +173,34 @@ message WriteResponse {
// should confirm that they match the intended result.
WriteAction action = 1;

// Offset provides the current "committed" size for the Write.
int64 offset = 2;
// StartedAt provides the time at which the write began.
//
// This must be set for stat and commit write actions. All other write
// actions may omit this.
google.protobuf.Timestamp started_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];

// UpdatedAt provides the last time of a successful write.
//
// This must be set for stat and commit write actions. All other write
// actions may omit this.
google.protobuf.Timestamp updated_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];

// Offset is the current committed size for the write.
int64 offset = 4;

// Total provides the current, expected total size of the write.
//
// We include this to provide consistency with the Status structure on the
// client writer.
//
// This is only valid on the Stat and Commit response.
int64 total = 5;

// Digest, if present, includes the digest up to the currently committed
// bytes. If action is commit, this field will be set. It is implementation
// defined if this is set for other actions, except abort. On abort, this
// will be empty.
string digest = 3 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];

// StartedAt is the time at which the write first started.
google.protobuf.Timestamp started_at = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];

// UpdatedAt is the time the write was last updated.
google.protobuf.Timestamp updated_at = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string digest = 6 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
}

message StatusRequest {
Expand All @@ -194,8 +209,9 @@ message StatusRequest {
}

message StatusResponse {
string ref = 1;
int64 offset = 2;
google.protobuf.Timestamp started_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp updated_at = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp started_at = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp updated_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string ref = 3;
int64 offset = 4;
int64 total = 5;
}
2 changes: 1 addition & 1 deletion cmd/dist/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func resolveContentStore(context *cli.Context) (*content.Store, error) {
root := context.GlobalString("root")
root := filepath.Join(context.GlobalString("root"), "content")
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
Expand Down
12 changes: 9 additions & 3 deletions cmd/dist/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ var fetchCommand = cli.Command{

// getResolver prepares the resolver from the environment and options.
func getResolver(ctx contextpkg.Context) (remotes.Resolver, error) {
return remotes.ResolverFunc(func(ctx contextpkg.Context, locator string) (remotes.Remote, error) {
if !strings.HasPrefix(locator, "docker.io") {
return remotes.ResolverFunc(func(ctx contextpkg.Context, locator string) (remotes.Fetcher, error) {
if !strings.HasPrefix(locator, "docker.io") && !strings.HasPrefix(locator, "localhost:5000") {
return nil, errors.Errorf("unsupported locator: %q", locator)
}

Expand All @@ -113,12 +113,18 @@ func getResolver(ctx contextpkg.Context) (remotes.Resolver, error) {
prefix = strings.TrimPrefix(locator, "docker.io/")
)

if strings.HasPrefix(locator, "localhost:5000") {
base.Scheme = "http"
base.Host = "localhost:5000"
prefix = strings.TrimPrefix(locator, "localhost:5000/")
}

token, err := getToken(ctx, "repository:"+prefix+":pull")
if err != nil {
return nil, err
}

return remotes.RemoteFunc(func(ctx contextpkg.Context, object string, hints ...string) (io.ReadCloser, error) {
return remotes.FetcherFunc(func(ctx contextpkg.Context, object string, hints ...string) (io.ReadCloser, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"prefix": prefix, // or repo?
Expand Down
2 changes: 1 addition & 1 deletion cmd/dist/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ var ingestCommand = cli.Command{
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
// all data to be written in a single invocation. Allow multiple writes
// to the same transaction key followed by a commit.
return content.WriteBlob(ctx, ingester, os.Stdin, ref, expectedSize, expectedDigest)
return content.WriteBlob(ctx, ingester, ref, os.Stdin, expectedSize, expectedDigest)
},
}
4 changes: 3 additions & 1 deletion cmd/dist/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ distribution tool
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
},
cli.StringFlag{
// TODO(stevvooe): for now, we allow circumventing the GRPC. Once
// we have clear separation, this will likely go away.
Name: "root",
Usage: "path to content store root",
Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content
Value: "/var/lib/containerd",
},
cli.StringFlag{
Name: "socket, s",
Expand Down
38 changes: 27 additions & 11 deletions content/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"io"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

contentapi "github.com/docker/containerd/api/services/content"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
Expand Down Expand Up @@ -83,10 +86,10 @@ type remoteIngester struct {
client contentapi.ContentClient
}

func (ri *remoteIngester) Writer(ctx context.Context, ref string) (Writer, error) {
wrclient, offset, err := ri.negotiate(ctx, ref)
func (ri *remoteIngester) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error) {
wrclient, offset, err := ri.negotiate(ctx, ref, size, expected)
if err != nil {
return nil, err
return nil, rewriteGRPCError(err)
}

return &remoteWriter{
Expand All @@ -95,15 +98,17 @@ func (ri *remoteIngester) Writer(ctx context.Context, ref string) (Writer, error
}, nil
}

func (ri *remoteIngester) negotiate(ctx context.Context, ref string) (contentapi.Content_WriteClient, int64, error) {
func (ri *remoteIngester) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
wrclient, err := ri.client.Write(ctx)
if err != nil {
return nil, 0, err
}

if err := wrclient.Send(&contentapi.WriteRequest{
Action: contentapi.WriteActionStat,
Ref: ref,
Action: contentapi.WriteActionStat,
Ref: ref,
Total: size,
Expected: expected,
}); err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -192,20 +197,20 @@ func (rw *remoteWriter) Write(p []byte) (n int, err error) {

func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
resp, err := rw.send(&contentapi.WriteRequest{
Action: contentapi.WriteActionCommit,
ExpectedSize: size,
ExpectedDigest: expected,
Action: contentapi.WriteActionCommit,
Total: size,
Expected: expected,
})
if err != nil {
return err
return rewriteGRPCError(err)
}

if size != 0 && resp.Offset != size {
return errors.Errorf("unexpected size: %v != %v", resp.Offset, size)
}

if expected != "" && resp.Digest != expected {
return errors.New("unexpected digest")
return errors.Errorf("unexpected digest: %v != %v", resp.Digest, expected)
}

return nil
Expand All @@ -214,3 +219,14 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
func (rw *remoteWriter) Close() error {
return rw.client.CloseSend()
}

func rewriteGRPCError(err error) error {
switch grpc.Code(errors.Cause(err)) {
case codes.AlreadyExists:
return errExists
case codes.NotFound:
return errNotFound
}

return err
}
8 changes: 7 additions & 1 deletion content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

var (
errNotFound = errors.New("content: not found")
errExists = errors.New("content: exists")

BufPool = sync.Pool{
New: func() interface{} {
Expand All @@ -33,6 +34,7 @@ type Provider interface {
type Status struct {
Ref string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
Expand All @@ -45,9 +47,13 @@ type Writer interface {
}

type Ingester interface {
Writer(ctx context.Context, ref string) (Writer, error)
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error)
}

func IsNotFound(err error) bool {
return errors.Cause(err) == errNotFound
}

func IsExists(err error) bool {
return errors.Cause(err) == errExists
}
10 changes: 5 additions & 5 deletions content/content_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal("ingest dir should be created", err)
}

cw, err := cs.Writer(ctx, "myref")
cw, err := cs.Writer(ctx, "myref", 0, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -39,13 +39,13 @@ func TestContentWriter(t *testing.T) {
}

// reopen, so we can test things
cw, err = cs.Writer(ctx, "myref")
cw, err = cs.Writer(ctx, "myref", 0, "")
if err != nil {
t.Fatal(err)
}

// make sure that second resume also fails
if _, err = cs.Writer(ctx, "myref"); err == nil {
if _, err = cs.Writer(ctx, "myref", 0, ""); err == nil {
// TODO(stevvooe): This also works across processes. Need to find a way
// to test that, as well.
t.Fatal("no error on second resume")
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err)
}

cw, err = cs.Writer(ctx, "aref")
cw, err = cs.Writer(ctx, "aref", 0, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
}

func checkWrite(t checker, ctx context.Context, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
if err := WriteBlob(ctx, cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil {
if err := WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
t.Fatal(err)
}

Expand Down
Loading

0 comments on commit 4bce28a

Please sign in to comment.