forked from kubeflow/pipelines
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add GCS support and tests (kubeflow#1105)
* Adding in GCS support Adding in testing Removing mockapi and using google-cloud-go-testing instead, removing unnecessary methods, cleaning up code Changing back dockerfile name Rebasing on master Fixing import statement Reverting kfstorage rename to storage, changing gcs import Fixing import statement Changing import in test Combining tests into watcher_test, putting mocks into a testutils package Removing unnecessary suite run, renaming testutils to mocks Adding more test cases, accounting for lack of model name in passed in storageURI Changing iterator retrieval logic Rebasing and cleaning code * Returning a warning if queried object doesn't exist in bucket, resolving test * Rebasing, removing unused import, refactoring * Adding missing parameter to NewWatcher call
- Loading branch information
Showing
9 changed files
with
529 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package mocks | ||
|
||
import ( | ||
"bytes" | ||
gstorage "cloud.google.com/go/storage" | ||
"context" | ||
"fmt" | ||
"github.com/googleapis/google-cloud-go-testing/storage/stiface" | ||
"google.golang.org/api/iterator" | ||
"strings" | ||
) | ||
|
||
type mockGCSClient struct { | ||
stiface.Client | ||
buckets map[string]*mockBucket | ||
} | ||
|
||
type mockBucket struct { | ||
attrs *gstorage.BucketAttrs | ||
objects map[string]*gstorage.ObjectAttrs | ||
} | ||
|
||
func NewMockClient() stiface.Client { | ||
return &mockGCSClient{buckets: map[string]*mockBucket{}} | ||
} | ||
|
||
func (c *mockGCSClient) Bucket(name string) stiface.BucketHandle { | ||
return mockBucketHandle{c: c, name: name} | ||
} | ||
|
||
type mockBucketHandle struct { | ||
stiface.BucketHandle | ||
c *mockGCSClient | ||
name string | ||
} | ||
|
||
func (b mockBucketHandle) Create(_ context.Context, _ string, attrs *gstorage.BucketAttrs) error { | ||
if _, ok := b.c.buckets[b.name]; ok { | ||
return fmt.Errorf("bucket %q already exists", b.name) | ||
} | ||
if attrs == nil { | ||
attrs = &gstorage.BucketAttrs{} | ||
} | ||
attrs.Name = b.name | ||
b.c.buckets[b.name] = &mockBucket{attrs: attrs, objects: map[string]*gstorage.ObjectAttrs{}} | ||
return nil | ||
} | ||
|
||
func (b mockBucketHandle) Objects(ctx context.Context, query *gstorage.Query) stiface.ObjectIterator { | ||
var items []*gstorage.ObjectAttrs | ||
objs := b.c.buckets[b.name].objects | ||
for key, element := range objs { | ||
if strings.Contains(key, query.Prefix){ | ||
items = append(items, element) | ||
} | ||
} | ||
return &mockObjectIterator{b: b, items: items} | ||
} | ||
|
||
type mockObjectIterator struct { | ||
stiface.ObjectIterator | ||
b mockBucketHandle | ||
items []*gstorage.ObjectAttrs | ||
} | ||
|
||
func (i *mockObjectIterator) Next() (*gstorage.ObjectAttrs, error) { | ||
if len(i.items) == 0 { | ||
return nil, iterator.Done | ||
} | ||
item := i.items[0] | ||
i.items = i.items[1:] | ||
return item, nil | ||
} | ||
|
||
func (b mockBucketHandle) Object(name string) stiface.ObjectHandle { | ||
return mockObjectHandle{c: b.c, bucketName: b.name, name: name} | ||
} | ||
|
||
type mockObjectHandle struct { | ||
stiface.ObjectHandle | ||
c *mockGCSClient | ||
bucketName string | ||
name string | ||
} | ||
|
||
func (o mockObjectHandle) Attrs(context.Context) (*gstorage.ObjectAttrs, error) { | ||
bkt, ok := o.c.buckets[o.bucketName] | ||
if !ok { | ||
return nil, fmt.Errorf("bucket %q not found", o.bucketName) | ||
} | ||
contents, ok := bkt.objects[o.name] | ||
if !ok { | ||
return nil, gstorage.ErrObjectNotExist | ||
} | ||
return contents, nil | ||
} | ||
|
||
func (o mockObjectHandle) NewReader(context.Context) (stiface.Reader, error) { | ||
bkt, ok := o.c.buckets[o.bucketName] | ||
if !ok { | ||
return nil, fmt.Errorf("bucket %q not found", o.bucketName) | ||
} | ||
contents, ok := bkt.objects[o.name] | ||
if !ok { | ||
return nil, fmt.Errorf("object %q not found in bucket %q", o.name, o.bucketName) | ||
} | ||
return mockReader{r: bytes.NewReader(contents.MD5)}, nil | ||
} | ||
|
||
func (o mockObjectHandle) NewWriter(context.Context) stiface.Writer { | ||
attrs := &gstorage.ObjectAttrs{ | ||
Bucket: o.bucketName, | ||
Name: o.name, | ||
MD5: nil, | ||
} | ||
o.c.buckets[o.bucketName].objects[o.name] = attrs | ||
return &mockWriter{o: o, obj: attrs} | ||
} | ||
|
||
type mockReader struct { | ||
stiface.Reader | ||
r *bytes.Reader | ||
} | ||
|
||
func (r mockReader) Read(buf []byte) (int, error) { | ||
return r.r.Read(buf) | ||
} | ||
|
||
func (r mockReader) Close() error { | ||
return nil | ||
} | ||
|
||
type mockWriter struct { | ||
stiface.Writer | ||
o mockObjectHandle | ||
buf bytes.Buffer | ||
obj *gstorage.ObjectAttrs | ||
} | ||
|
||
func (w *mockWriter) Write(data []byte) (int, error) { | ||
int, err := w.buf.Write(data) | ||
w.obj.MD5 = data | ||
return int, err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package mocks | ||
|
||
import ( | ||
"fmt" | ||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/service/s3" | ||
"github.com/aws/aws-sdk-go/service/s3/s3iface" | ||
"github.com/aws/aws-sdk-go/service/s3/s3manager" | ||
"github.com/golang/protobuf/proto" | ||
) | ||
|
||
type MockS3Client struct { | ||
s3iface.S3API | ||
} | ||
|
||
func (m *MockS3Client) ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { | ||
return &s3.ListObjectsOutput{ | ||
Contents: []*s3.Object{ | ||
{ | ||
Key: proto.String("model.pt"), | ||
}, | ||
}, | ||
}, nil | ||
} | ||
|
||
type MockS3Downloader struct { | ||
} | ||
|
||
func (m *MockS3Downloader) DownloadWithIterator(aws.Context, s3manager.BatchDownloadIterator, ...func(*s3manager.Downloader)) error { | ||
return nil | ||
} | ||
|
||
type MockS3FailDownloader struct { | ||
Err error | ||
} | ||
|
||
func (m *MockS3FailDownloader) DownloadWithIterator(aws.Context, s3manager.BatchDownloadIterator, ...func(*s3manager.Downloader)) error { | ||
var errs []s3manager.Error | ||
errs = append(errs, s3manager.Error{ | ||
OrigErr: fmt.Errorf("failed to download"), | ||
Bucket: aws.String("modelRepo"), | ||
Key: aws.String("model1/model.pt"), | ||
}) | ||
return s3manager.NewBatchError("BatchedDownloadIncomplete", "some objects have failed to download.", errs) | ||
} |
Oops, something went wrong.