diff --git a/changelog/unreleased/make-dataprovider-return-metadata.md b/changelog/unreleased/make-dataprovider-return-metadata.md new file mode 100644 index 0000000000..f928c72ba2 --- /dev/null +++ b/changelog/unreleased/make-dataprovider-return-metadata.md @@ -0,0 +1,5 @@ +Change: dataproviders now return file metadata + +Dataprovider drivers can now return file metadata. When the resource info contains a file id, the mtime or an etag, these will be included in the response as the corresponding http headers. + +https://github.com/cs3org/reva/pull/3154 diff --git a/internal/http/services/owncloud/ocdav/put.go b/internal/http/services/owncloud/ocdav/put.go index fe7fab892b..2ef1415149 100644 --- a/internal/http/services/owncloud/ocdav/put.go +++ b/internal/http/services/owncloud/ocdav/put.go @@ -24,7 +24,6 @@ import ( "path" "strconv" "strings" - "time" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -36,8 +35,6 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/rhttp" - "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" - "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/rs/zerolog" ) @@ -295,41 +292,19 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ return } - sReq := &provider.StatRequest{Ref: ref} - if chunking.IsChunked(ref.Path) { - chunk, err := chunking.GetChunkBLOBInfo(ref.Path) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - sReq = &provider.StatRequest{Ref: &provider.Reference{ - ResourceId: ref.ResourceId, - Path: chunk.Path, - }} + // copy headers if they are present + if httpRes.Header.Get(net.HeaderETag) != "" { + w.Header().Set(net.HeaderETag, httpRes.Header.Get(net.HeaderETag)) } - - // stat again to check the new file's metadata - sRes, err := client.Stat(ctx, sReq) - if err != nil { - log.Error().Err(err).Msg("error sending grpc stat request") - w.WriteHeader(http.StatusInternalServerError) - return + if httpRes.Header.Get(net.HeaderOCETag) != "" { + w.Header().Set(net.HeaderOCETag, httpRes.Header.Get(net.HeaderOCETag)) } - - if sRes.Status.Code != rpc.Code_CODE_OK { - errors.HandleErrorStatus(&log, w, sRes.Status) - return + if httpRes.Header.Get(net.HeaderOCFileID) != "" { + w.Header().Set(net.HeaderOCFileID, httpRes.Header.Get(net.HeaderOCFileID)) + } + if httpRes.Header.Get(net.HeaderLastModified) != "" { + w.Header().Set(net.HeaderLastModified, httpRes.Header.Get(net.HeaderLastModified)) } - - newInfo := sRes.Info - - w.Header().Add(net.HeaderContentType, newInfo.MimeType) - w.Header().Set(net.HeaderETag, newInfo.Etag) - w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(*newInfo.Id)) - w.Header().Set(net.HeaderOCETag, newInfo.Etag) - t := utils.TSToTime(newInfo.Mtime).UTC() - lastModifiedString := t.Format(time.RFC1123Z) - w.Header().Set(net.HeaderLastModified, lastModifiedString) // file was new // FIXME make created flag a property on the InitiateFileUploadResponse diff --git a/pkg/rhttp/datatx/manager/simple/simple.go b/pkg/rhttp/datatx/manager/simple/simple.go index 495a4e4aea..a18a6fa3c7 100644 --- a/pkg/rhttp/datatx/manager/simple/simple.go +++ b/pkg/rhttp/datatx/manager/simple/simple.go @@ -20,12 +20,14 @@ package simple import ( "net/http" + "time" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" @@ -33,6 +35,8 @@ import ( "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" "github.com/cs3org/reva/v2/pkg/storage" + "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" ) func init() { @@ -81,13 +85,24 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { defer r.Body.Close() ref := &provider.Reference{Path: fn} - err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) { + info, err := fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) { if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil { sublog.Error().Err(err).Msg("failed to publish FileUploaded event") } }) switch v := err.(type) { case nil: + // set etag, mtime and file id + w.Header().Set(net.HeaderETag, info.Etag) + w.Header().Set(net.HeaderOCETag, info.Etag) + if info.Id != nil { + w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(*info.Id)) + } + if info.Mtime != nil { + t := utils.TSToTime(info.Mtime).UTC() + lastModifiedString := t.Format(time.RFC1123Z) + w.Header().Set(net.HeaderLastModified, lastModifiedString) + } w.WriteHeader(http.StatusOK) case errtypes.PartialContent: w.WriteHeader(http.StatusPartialContent) diff --git a/pkg/rhttp/datatx/manager/spaces/spaces.go b/pkg/rhttp/datatx/manager/spaces/spaces.go index a6dfc5db09..7f86e58eac 100644 --- a/pkg/rhttp/datatx/manager/spaces/spaces.go +++ b/pkg/rhttp/datatx/manager/spaces/spaces.go @@ -22,9 +22,11 @@ import ( "net/http" "path" "strings" + "time" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" @@ -34,6 +36,7 @@ import ( "github.com/cs3org/reva/v2/pkg/rhttp/router" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" ) @@ -95,13 +98,25 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { ResourceId: &rid, Path: fn, } - err = fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) { + var info provider.ResourceInfo + info, err = fs.Upload(ctx, ref, r.Body, func(owner *userpb.UserId, ref *provider.Reference) { if err := datatx.EmitFileUploadedEvent(owner, ref, m.publisher); err != nil { sublog.Error().Err(err).Msg("failed to publish FileUploaded event") } }) switch v := err.(type) { case nil: + // set etag, mtime and file id + w.Header().Set(net.HeaderETag, info.Etag) + w.Header().Set(net.HeaderOCETag, info.Etag) + if info.Id != nil { + w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(*info.Id)) + } + if info.Mtime != nil { + t := utils.TSToTime(info.Mtime).UTC() + lastModifiedString := t.Format(time.RFC1123Z) + w.Header().Set(net.HeaderLastModified, lastModifiedString) + } w.WriteHeader(http.StatusOK) case errtypes.PartialContent: w.WriteHeader(http.StatusPartialContent) diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index 1febf18474..989463b630 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -133,10 +133,12 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { switch method { case "POST": + // set etag, mtime and file id handler.PostFile(w, r) case "HEAD": handler.HeadFile(w, r) case "PATCH": + // set etag, mtime and file id setExpiresHeader(fs, w, r) handler.PatchFile(w, r) case "DELETE": diff --git a/pkg/storage/fs/nextcloud/nextcloud.go b/pkg/storage/fs/nextcloud/nextcloud.go index bd75318d5f..618eefd11a 100644 --- a/pkg/storage/fs/nextcloud/nextcloud.go +++ b/pkg/storage/fs/nextcloud/nextcloud.go @@ -396,8 +396,19 @@ func (nc *StorageDriver) InitiateUpload(ctx context.Context, ref *provider.Refer } // Upload as defined in the storage.FS interface -func (nc *StorageDriver) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { - return nc.doUpload(ctx, ref.Path, r) +func (nc *StorageDriver) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) (provider.ResourceInfo, error) { + err := nc.doUpload(ctx, ref.Path, r) + if err != nil { + return provider.ResourceInfo{}, err + } + + // return id, etag and mtime + ri, err := nc.GetMD(ctx, ref, []string{}, []string{"id", "etag", "mtime"}) + if err != nil { + return provider.ResourceInfo{}, err + } + + return *ri, nil } // Download as defined in the storage.FS interface diff --git a/pkg/storage/fs/nextcloud/nextcloud_server_mock.go b/pkg/storage/fs/nextcloud/nextcloud_server_mock.go index dcfe249960..dd083b899c 100644 --- a/pkg/storage/fs/nextcloud/nextcloud_server_mock.go +++ b/pkg/storage/fs/nextcloud/nextcloud_server_mock.go @@ -114,6 +114,8 @@ var responses = map[string]Response{ `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"path":"/file"},"uploadLength":0,"metadata":{"providerID":""}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty}, `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":0,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty}, + `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetMD {"ref":{"path":"/yes"},"mdKeys":[]}`: {200, `{"opaque":{},"type":1,"id":{"opaque_id":"fileid-/yes"},"checksum":{},"etag":"deadbeef","mime_type":"text/plain","mtime":{"seconds":1234567890},"path":"/yes","permission_set":{},"size":1,"canonical_metadata":{},"arbitrary_metadata":{}}`, serverStateEmpty}, + `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/ListFolder {"ref":{"path":"/"},"mdKeys":null}`: {200, `[{"opaque":{},"type":2,"id":{"opaque_id":"fileid-/subdir"},"checksum":{},"etag":"deadbeef","mime_type":"text/plain","mtime":{"seconds":1234567890},"path":"/subdir","permission_set":{},"size":12345,"canonical_metadata":{},"owner":{"opaque_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"arbitrary_metadata":{"metadata":{"da":"ta","some":"arbi","trary":"meta"}}}]`, serverStateEmpty}, `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/ListFolder {"ref":{"path":"/Shares"},"mdKeys":null} EMPTY`: {404, ``, serverStateEmpty}, @@ -157,6 +159,7 @@ var responses = map[string]Response{ // `POST /apps/sciencemesh/~tester/api/storage/ListFolder {"ref":{"resource_id":{"storage_id":"storage-id","opaque_id":"opaque-id"},"path":"/some"},"mdKeys":["val1","val2","val3"]}`: {200, `[{"opaque":{},"type":1,"id":{"opaque_id":"fileid-/path"},"checksum":{},"etag":"deadbeef","mime_type":"text/plain","mtime":{"seconds":1234567890},"path":"/path","permission_set":{},"size":12345,"canonical_metadata":{},"arbitrary_metadata":{"metadata":{"da":"ta","some":"arbi","trary":"meta"}}}]`, serverStateEmpty}, `POST /apps/sciencemesh/~tester/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"storage-id","opaque_id":"opaque-id"},"path":"/some/path"},"uploadLength":12345,"metadata":{"key1":"val1","key2":"val2","key3":"val3"}}`: {200, `{ "not":"sure", "what": "should be", "returned": "here" }`, serverStateEmpty}, `PUT /apps/sciencemesh/~tester/api/storage/Upload/some/file/path.txt shiny!`: {200, ``, serverStateEmpty}, + `POST /apps/sciencemesh/~tester/api/storage/GetMD {"ref":{"resource_id":{"storage_id":"storage-id","opaque_id":"opaque-id"},"path":"some/file/path.txt"},"mdKeys":[]}`: {200, `{"opaque":{},"type":1,"id":{"opaque_id":"fileid-/some/path"},"checksum":{},"etag":"deadbeef","mime_type":"text/plain","mtime":{"seconds":1234567890},"path":"/versionedFile","permission_set":{},"size":12345,"canonical_metadata":{},"arbitrary_metadata":{"metadata":{"key1":"val1","key2":"val2","key3":"val3"}}}`, serverStateEmpty}, `GET /apps/sciencemesh/~tester/api/storage/Download/some/file/path.txt `: {200, `the contents of the file`, serverStateEmpty}, `POST /apps/sciencemesh/~tester/api/storage/ListRevisions {"resource_id":{"storage_id":"storage-id","opaque_id":"opaque-id"},"path":"/some/path"}`: {200, `[{"opaque":{"map":{"some":{"value":"ZGF0YQ=="}}},"key":"version-12","size":12345,"mtime":1234567890,"etag":"deadb00f"},{"opaque":{"map":{"different":{"value":"c3R1ZmY="}}},"key":"asdf","size":12345,"mtime":1234567890,"etag":"deadbeef"}]`, serverStateEmpty}, `GET /apps/sciencemesh/~tester/api/storage/DownloadRevision/some%2Frevision/some/file/path.txt `: {200, `the contents of that revision`, serverStateEmpty}, diff --git a/pkg/storage/fs/nextcloud/nextcloud_test.go b/pkg/storage/fs/nextcloud/nextcloud_test.go index d74dc693ff..c33f366e75 100644 --- a/pkg/storage/fs/nextcloud/nextcloud_test.go +++ b/pkg/storage/fs/nextcloud/nextcloud_test.go @@ -447,9 +447,11 @@ var _ = Describe("Nextcloud", func() { } stringReader := strings.NewReader("shiny!") stringReadCloser := io.NopCloser(stringReader) - err := nc.Upload(ctx, ref, stringReadCloser, nil) + _, err := nc.Upload(ctx, ref, stringReadCloser, nil) Expect(err).ToNot(HaveOccurred()) - checkCalled(called, `PUT /apps/sciencemesh/~tester/api/storage/Upload/some/file/path.txt shiny!`) + Expect(len(*called)).To(Equal(2)) + Expect((*called)[0]).To(Equal(`PUT /apps/sciencemesh/~tester/api/storage/Upload/some/file/path.txt shiny!`)) + Expect((*called)[1]).To(Equal(`POST /apps/sciencemesh/~tester/api/storage/GetMD {"ref":{"resource_id":{"storage_id":"storage-id","opaque_id":"opaque-id"},"path":"some/file/path.txt"},"mdKeys":[]}`)) }) }) // Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) diff --git a/pkg/storage/fs/owncloudsql/upload.go b/pkg/storage/fs/owncloudsql/upload.go index a1fb8eacb7..a77c2e0341 100644 --- a/pkg/storage/fs/owncloudsql/upload.go +++ b/pkg/storage/fs/owncloudsql/upload.go @@ -41,6 +41,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/templates" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" "github.com/rs/zerolog/log" @@ -49,10 +50,10 @@ import ( var defaultFilePerm = os.FileMode(0664) -func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { +func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return errors.Wrap(err, "owncloudsql: error retrieving upload") + return provider.ResourceInfo{}, errors.Wrap(err, "owncloudsql: error retrieving upload") } uploadInfo := upload.(*fileUpload) @@ -62,18 +63,18 @@ func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r var assembledFile string p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { - return err + return provider.ResourceInfo{}, err } if p == "" { if err = uploadInfo.Terminate(ctx); err != nil { - return errors.Wrap(err, "owncloudsql: error removing auxiliary files") + return provider.ResourceInfo{}, errors.Wrap(err, "owncloudsql: error removing auxiliary files") } - return errtypes.PartialContent(ref.String()) + return provider.ResourceInfo{}, errtypes.PartialContent(ref.String()) } uploadInfo.info.Storage["InternalDestination"] = p fd, err := os.Open(assembledFile) if err != nil { - return errors.Wrap(err, "owncloudsql: error opening assembled file") + return provider.ResourceInfo{}, errors.Wrap(err, "owncloudsql: error opening assembled file") } defer fd.Close() defer os.RemoveAll(assembledFile) @@ -81,10 +82,45 @@ func (fs *owncloudsqlfs) Upload(ctx context.Context, ref *provider.Reference, r } if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { - return errors.Wrap(err, "owncloudsql: error writing to binary file") + return provider.ResourceInfo{}, errors.Wrap(err, "owncloudsql: error writing to binary file") + } + + if err := uploadInfo.FinishUpload(ctx); err != nil { + return provider.ResourceInfo{}, err + } + + if uff != nil { + info := uploadInfo.info + uploadRef := &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: info.MetaData["providerID"], + SpaceId: info.Storage["SpaceRoot"], + OpaqueId: info.Storage["SpaceRoot"], + }, + Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), + } + owner, ok := ctxpkg.ContextGetUser(uploadInfo.ctx) + if !ok { + return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") + } + uff(owner.Id, uploadRef) + } + + ri := provider.ResourceInfo{ + // fill with at least fileid, mtime and etag + Id: &provider.ResourceId{ + StorageId: uploadInfo.info.MetaData["providerID"], + SpaceId: uploadInfo.info.Storage["StorageId"], + OpaqueId: uploadInfo.info.Storage["fileid"], + }, + Etag: uploadInfo.info.MetaData["etag"], + } + + if mtime, err := utils.MTimeToTS(uploadInfo.info.MetaData["mtime"]); err == nil { + ri.Mtime = &mtime } - return uploadInfo.FinishUpload(ctx) + return ri, nil } // InitiateUpload returns upload ids corresponding to different protocols it supports @@ -402,20 +438,30 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error { if err != nil { return err } + + if upload.info.MetaData["mtime"] == "" { + upload.info.MetaData["mtime"] = fmt.Sprintf("%d", fi.ModTime().Unix()) + } + if upload.info.MetaData["etag"] == "" { + upload.info.MetaData["etag"] = calcEtag(upload.ctx, fi) + } + data := map[string]interface{}{ "path": upload.fs.toDatabasePath(ip), "checksum": fmt.Sprintf("SHA1:%032x MD5:%032x ADLER32:%032x", sha1h, md5h, adler32h), - "etag": calcEtag(upload.ctx, fi), + "etag": upload.info.MetaData["etag"], "size": upload.info.Size, "mimetype": mime.Detect(false, ip), "permissions": perms, "mtime": upload.info.MetaData["mtime"], "storage_mtime": upload.info.MetaData["mtime"], } - _, err = upload.fs.filecache.InsertOrUpdate(ctx, upload.info.Storage["StorageId"], data, false) + var fileid int + fileid, err = upload.fs.filecache.InsertOrUpdate(ctx, upload.info.Storage["StorageId"], data, false) if err != nil { return err } + upload.info.Storage["fileid"] = fmt.Sprintf("%d", fileid) // only delete the upload if it was successfully written to the storage if err := os.Remove(upload.infoPath); err != nil { diff --git a/pkg/storage/fs/s3/s3.go b/pkg/storage/fs/s3/s3.go index 9459f3751c..5921a146be 100644 --- a/pkg/storage/fs/s3/s3.go +++ b/pkg/storage/fs/s3/s3.go @@ -615,36 +615,6 @@ func (fs *s3FS) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys, return finfos, nil } -func (fs *s3FS) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { - log := appctx.GetLogger(ctx) - - fn, err := fs.resolve(ctx, ref) - if err != nil { - return errors.Wrap(err, "error resolving ref") - } - - upParams := &s3manager.UploadInput{ - Bucket: aws.String(fs.config.Bucket), - Key: aws.String(fn), - Body: r, - } - uploader := s3manager.NewUploaderWithClient(fs.client) - result, err := uploader.Upload(upParams) - - if err != nil { - log.Error().Err(err) - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == s3.ErrCodeNoSuchBucket { - return errtypes.NotFound(fn) - } - } - return errors.Wrap(err, "s3fs: error creating object "+fn) - } - - log.Debug().Interface("result", result) // todo cache etag? - return nil -} - func (fs *s3FS) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) { log := appctx.GetLogger(ctx) diff --git a/pkg/storage/fs/s3/upload.go b/pkg/storage/fs/s3/upload.go index 40d9e690df..bf227583c2 100644 --- a/pkg/storage/fs/s3/upload.go +++ b/pkg/storage/fs/s3/upload.go @@ -20,11 +20,56 @@ package s3 import ( "context" + "io" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/storage" + "github.com/pkg/errors" ) +func (fs *s3FS) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { + log := appctx.GetLogger(ctx) + + fn, err := fs.resolve(ctx, ref) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "error resolving ref") + } + + upParams := &s3manager.UploadInput{ + Bucket: aws.String(fs.config.Bucket), + Key: aws.String(fn), + Body: r, + } + uploader := s3manager.NewUploaderWithClient(fs.client) + result, err := uploader.Upload(upParams) + + if err != nil { + log.Error().Err(err) + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == s3.ErrCodeNoSuchBucket { + return provider.ResourceInfo{}, errtypes.NotFound(fn) + } + } + return provider.ResourceInfo{}, errors.Wrap(err, "s3fs: error creating object "+fn) + } + + log.Debug().Interface("result", result) // todo cache etag? + + // return id, etag and mtime + ri, err := fs.GetMD(ctx, ref, []string{}, []string{"id", "etag", "mtime"}) + if err != nil { + return provider.ResourceInfo{}, err + } + + return *ri, nil +} + // InitiateUpload returns upload ids corresponding to different protocols it supports func (fs *s3FS) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { return nil, errtypes.NotSupported("op not supported") diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index c355631a2e..7ec5de31dc 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -44,7 +44,7 @@ type FS interface { GetMD(ctx context.Context, ref *provider.Reference, mdKeys, fieldMask []string) (*provider.ResourceInfo, error) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys, fieldMask []string) ([]*provider.ResourceInfo, error) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) - Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uploadFunc UploadFinishedFunc) error + Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uploadFunc UploadFinishedFunc) (provider.ResourceInfo, error) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) ListRevisions(ctx context.Context, ref *provider.Reference) ([]*provider.FileVersion, error) DownloadRevision(ctx context.Context, ref *provider.Reference, key string) (io.ReadCloser, error) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 0d2f15179b..84bf3a58cf 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -61,10 +61,10 @@ var defaultFilePerm = os.FileMode(0664) // Upload uploads data to the given resource // TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. // Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? -func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) error { +func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return errors.Wrap(err, "Decomposedfs: error retrieving upload") + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload") } uploadInfo := upload.(*fileUpload) @@ -74,18 +74,18 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i var assembledFile string p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { - return err + return provider.ResourceInfo{}, err } if p == "" { if err = uploadInfo.Terminate(ctx); err != nil { - return errors.Wrap(err, "ocfs: error removing auxiliary files") + return provider.ResourceInfo{}, errors.Wrap(err, "ocfs: error removing auxiliary files") } - return errtypes.PartialContent(ref.String()) + return provider.ResourceInfo{}, errtypes.PartialContent(ref.String()) } uploadInfo.info.Storage["NodeName"] = p fd, err := os.Open(assembledFile) if err != nil { - return errors.Wrap(err, "Decomposedfs: error opening assembled file") + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error opening assembled file") } defer fd.Close() defer os.RemoveAll(assembledFile) @@ -93,11 +93,11 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i } if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { - return errors.Wrap(err, "Decomposedfs: error writing to binary file") + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing to binary file") } if err := uploadInfo.FinishUpload(ctx); err != nil { - return err + return provider.ResourceInfo{}, err } if uff != nil { @@ -112,12 +112,26 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i } owner, ok := ctxpkg.ContextGetUser(uploadInfo.ctx) if !ok { - return errtypes.PreconditionFailed("error getting user from uploadinfo context") + return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") } uff(owner.Id, uploadRef) } - return nil + ri := provider.ResourceInfo{ + // fill with at least fileid, mtime and etag + Id: &provider.ResourceId{ + StorageId: uploadInfo.info.MetaData["providerID"], + SpaceId: uploadInfo.info.Storage["SpaceRoot"], + OpaqueId: uploadInfo.info.Storage["NodeId"], + }, + Etag: uploadInfo.info.MetaData["etag"], + } + + if mtime, err := utils.MTimeToTS(uploadInfo.info.MetaData["mtime"]); err == nil { + ri.Mtime = &mtime + } + + return ri, nil } // InitiateUpload returns upload ids corresponding to different protocols it supports @@ -577,20 +591,22 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { return err } + overwrite := n.ID != "" var oldSize uint64 - if n.ID != "" { + if overwrite { + // read size from existing node old, _ := node.ReadNode(ctx, upload.fs.lu, spaceID, n.ID, false) oldSize = uint64(old.Blobsize) + } else { + // create new fileid + n.ID = uuid.New().String() + upload.info.Storage["NodeId"] = n.ID } - _, err = node.CheckQuota(n.SpaceRoot, n.ID != "", oldSize, uint64(fi.Size())) - if err != nil { + if _, err = node.CheckQuota(n.SpaceRoot, overwrite, oldSize, uint64(fi.Size())); err != nil { return err } - if n.ID == "" { - n.ID = uuid.New().String() - } targetPath := n.InternalPath() sublog := appctx.GetLogger(upload.ctx). With(). @@ -769,6 +785,13 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { sublog.Err(err).Interface("info", upload.info).Msg("Decomposedfs: could not set mtime metadata") return err } + + } + + // fill metadata with current mtime + if fi, err = os.Stat(targetPath); err == nil { + upload.info.MetaData["mtime"] = fmt.Sprintf("%d.%d", fi.ModTime().Unix(), fi.ModTime().Nanosecond()) + upload.info.MetaData["etag"], _ = node.CalculateEtag(n.ID, fi.ModTime()) } n.Exists = true diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 4e0c156762..c8698d93d0 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -242,7 +242,7 @@ var _ = Describe("File uploads", func() { Expect(data).To(Equal([]byte("0123456789"))) }) - err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) + _, err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) Expect(err).ToNot(HaveOccurred()) bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -280,7 +280,7 @@ var _ = Describe("File uploads", func() { Expect(data).To(Equal([]byte(""))) }) - err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) + _, err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) Expect(err).ToNot(HaveOccurred()) bs.AssertCalled(GinkgoT(), "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -300,7 +300,7 @@ var _ = Describe("File uploads", func() { ) uploadRef := &provider.Reference{Path: "/some-non-existent-upload-reference"} - err := fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) + _, err := fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(fileContent)), nil) Expect(err).To(HaveOccurred()) diff --git a/pkg/storage/utils/eosfs/upload.go b/pkg/storage/utils/eosfs/upload.go index d9adbadb95..c3601d8b56 100644 --- a/pkg/storage/utils/eosfs/upload.go +++ b/pkg/storage/utils/eosfs/upload.go @@ -31,28 +31,28 @@ import ( "github.com/pkg/errors" ) -func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { +func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { p, err := fs.resolve(ctx, ref) if err != nil { - return errors.Wrap(err, "eos: error resolving reference") + return provider.ResourceInfo{}, errors.Wrap(err, "eos: error resolving reference") } if fs.isShareFolder(ctx, p) { - return errtypes.PermissionDenied("eos: cannot upload under the virtual share folder") + return provider.ResourceInfo{}, errtypes.PermissionDenied("eos: cannot upload under the virtual share folder") } if chunking.IsChunked(p) { var assembledFile string p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { - return err + return provider.ResourceInfo{}, err } if p == "" { - return errtypes.PartialContent(ref.String()) + return provider.ResourceInfo{}, errtypes.PartialContent(ref.String()) } fd, err := os.Open(assembledFile) if err != nil { - return errors.Wrap(err, "eos: error opening assembled file") + return provider.ResourceInfo{}, errors.Wrap(err, "eos: error opening assembled file") } defer fd.Close() defer os.RemoveAll(assembledFile) @@ -63,16 +63,31 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC u, err := getUser(ctx) if err != nil { - return errors.Wrap(err, "eos: no user in ctx") + return provider.ResourceInfo{}, errors.Wrap(err, "eos: no user in ctx") } // We need the auth corresponding to the parent directory // as the file might not exist at the moment auth, err := fs.getUserAuth(ctx, u, path.Dir(fn)) if err != nil { - return err + return provider.ResourceInfo{}, err } - return fs.c.Write(ctx, auth, fn, r) + + if err := fs.c.Write(ctx, auth, fn, r); err != nil { + return provider.ResourceInfo{}, err + } + + eosFileInfo, err := fs.c.GetFileInfoByPath(ctx, auth, fn) + if err != nil { + return provider.ResourceInfo{}, err + } + + ri, err := fs.convertToResourceInfo(ctx, eosFileInfo) + if err != nil { + return provider.ResourceInfo{}, err + } + + return *ri, nil } func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index fb6eac62fe..2c46ea9ba3 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -41,10 +41,10 @@ import ( var defaultFilePerm = os.FileMode(0664) -func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, _ storage.UploadFinishedFunc) error { +func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { upload, err := fs.GetUpload(ctx, ref.GetPath()) if err != nil { - return errors.Wrap(err, "localfs: error retrieving upload") + return provider.ResourceInfo{}, errors.Wrap(err, "localfs: error retrieving upload") } uploadInfo := upload.(*fileUpload) @@ -54,18 +54,18 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea var assembledFile string p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r) if err != nil { - return err + return provider.ResourceInfo{}, err } if p == "" { if err = uploadInfo.Terminate(ctx); err != nil { - return errors.Wrap(err, "localfs: error removing auxiliary files") + return provider.ResourceInfo{}, errors.Wrap(err, "localfs: error removing auxiliary files") } - return errtypes.PartialContent(ref.String()) + return provider.ResourceInfo{}, errtypes.PartialContent(ref.String()) } uploadInfo.info.Storage["InternalDestination"] = p fd, err := os.Open(assembledFile) if err != nil { - return errors.Wrap(err, "localfs: error opening assembled file") + return provider.ResourceInfo{}, errors.Wrap(err, "localfs: error opening assembled file") } defer fd.Close() defer os.RemoveAll(assembledFile) @@ -73,10 +73,37 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea } if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil { - return errors.Wrap(err, "localfs: error writing to binary file") + return provider.ResourceInfo{}, errors.Wrap(err, "localfs: error writing to binary file") + } + + if err := uploadInfo.FinishUpload(ctx); err != nil { + return provider.ResourceInfo{}, err + } + + if uff != nil { + info := uploadInfo.info + uploadRef := &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: info.MetaData["providerID"], + SpaceId: info.Storage["SpaceRoot"], + OpaqueId: info.Storage["SpaceRoot"], + }, + Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), + } + owner, ok := ctxpkg.ContextGetUser(uploadInfo.ctx) + if !ok { + return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") + } + uff(owner.Id, uploadRef) + } + + // return id, etag and mtime + ri, err := fs.GetMD(ctx, ref, []string{}, []string{"id", "etag", "mtime"}) + if err != nil { + return provider.ResourceInfo{}, err } - return uploadInfo.FinishUpload(ctx) + return *ri, nil } // InitiateUpload returns upload ids corresponding to different protocols it supports diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 821d131550..f8f1cde42d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -28,6 +28,7 @@ import ( "path" "path/filepath" "regexp" + "strconv" "strings" "time" @@ -159,6 +160,18 @@ func TSNow() *types.Timestamp { } } +// MTimeToTS converts a string in the form "." into a CS3 Timestamp +func MTimeToTS(v string) (ts types.Timestamp, err error) { + p := strings.SplitN(v, ".", 2) + var sec, nsec uint64 + if sec, err = strconv.ParseUint(p[0], 10, 64); err == nil { + if len(p) > 1 { + nsec, err = strconv.ParseUint(p[1], 10, 32) + } + } + return types.Timestamp{Seconds: sec, Nanos: uint32(nsec)}, err +} + // ExtractGranteeID returns the ID, user or group, set in the GranteeId object func ExtractGranteeID(grantee *provider.Grantee) (*userpb.UserId, *grouppb.GroupId) { switch t := grantee.Id.(type) { diff --git a/tests/helpers/helpers.go b/tests/helpers/helpers.go index b4f5c7d3e1..2b657da6e2 100644 --- a/tests/helpers/helpers.go +++ b/tests/helpers/helpers.go @@ -62,6 +62,6 @@ func Upload(ctx context.Context, fs storage.FS, ref *provider.Reference, content return errors.New("simple upload method not available") } uploadRef := &provider.Reference{Path: "/" + uploadID} - err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(content)), nil) + _, err = fs.Upload(ctx, uploadRef, ioutil.NopCloser(bytes.NewReader(content)), nil) return err }