Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle non-TUS uploads called directly with upload path #1314

Merged
merged 4 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/unreleased/uploads-refactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ refactors that workflow to accept incoming requests following the tus protocol
while using simpler transmission internally.

https://github.com/cs3org/reva/pull/1285
https://github.com/cs3org/reva/pull/1314
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
w.Header().Set("ETag", info.Etag)
w.Header().Set("OC-FileId", wrapResourceID(info.Id))
w.Header().Set("OC-ETag", info.Etag)
t := utils.TSToTime(info.Mtime)
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *svc) handleHead(w http.ResponseWriter, r *http.Request, ns string) {
w.Header().Set("ETag", info.Etag)
w.Header().Set("OC-FileId", wrapResourceID(info.Id))
w.Header().Set("OC-ETag", info.Etag)
t := utils.TSToTime(info.Mtime)
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.WriteHeader(http.StatusOK)
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (s *svc) handlePutHelper(w http.ResponseWriter, r *http.Request, content io
w.Header().Set("ETag", newInfo.Etag)
w.Header().Set("OC-FileId", wrapResourceID(newInfo.Id))
w.Header().Set("OC-ETag", newInfo.Etag)
t := utils.TSToTime(newInfo.Mtime)
t := utils.TSToTime(newInfo.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)

Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *svc) handleTusPost(w http.ResponseWriter, r *http.Request, ns string) {
w.Header().Set("OC-FileId", wrapResourceID(info.Id))
w.Header().Set("OC-ETag", info.Etag)
w.Header().Set("ETag", info.Etag)
t := utils.TSToTime(info.Mtime)
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
}
Expand Down
28 changes: 21 additions & 7 deletions pkg/storage/fs/ocis/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package ocis

import (
"bytes"
"context"
"encoding/json"
"io"
Expand All @@ -44,15 +45,28 @@ var defaultFilePerm = os.FileMode(0664)
func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) (err error) {
upload, err := fs.GetUpload(ctx, ref.GetPath())
if err != nil {
return errors.Wrap(err, "ocisfs: error retrieving upload")
// Upload corresponding to this ID was not found.
// Assume that this corresponds to the resource path to which the file has to be uploaded.
buf := &bytes.Buffer{}
length, err := io.Copy(buf, r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this buffers the whole file in memory before writing it to disk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to revert the changes and use the lookup of the node before ... I can look into making the chunking work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this reads consumes the body, which makes the subsequent writeChunk not read anything ...

Copy link
Contributor Author

@ishank011 ishank011 Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@butonic the refactoring concerns less the chunking part and more that we can support both TUS and non-TUS uploads out of the box without ugly config changes.

This step was just to get the length of the upload since that is not explicitly provided. One way to get around this is to set SizeIsDeferred to true, since we never really use it for the normal uploads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ... and Upload() was implementing the non tus based upload. For tus you need InitiateUpload and UseIn, which you correctly check in the dataprovider ... why did you need to touch the Upload implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit gets rid of the buffer issue.

The aim was to have uniform implementation for all types of uploads. Calling InitiateUpload for TUS uploads and not doing so for others didn't seem ideal. Also, since all the file systems did in fact have the UseIn method, the PUT calls were handled by a wrapper around the tus client which was also a hack.

func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) {

We wanted to add support for multiple protocols as started in #828, so all of these should ideally have the same workflow: Call InitiateUpload -> Send PUT/POST call to the returned URL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing the PR

if err != nil {
return err
}
uploadID, err := fs.InitiateUpload(ctx, ref, length, nil)
if err != nil {
return err
}
if upload, err = fs.GetUpload(ctx, uploadID); err != nil {
return errors.Wrap(err, "ocisfs: error retrieving upload")
}
}

uploadInfo := upload.(*fileUpload)

p := uploadInfo.info.Storage["NodeName"]
ok, err := chunking.IsChunked(p)
if err != nil {
return errors.Wrap(err, "ocfs: error checking path")
return errors.Wrap(err, "ocisfs: error checking path")
}
if ok {
var assembledFile string
Expand All @@ -69,7 +83,7 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read
uploadInfo.info.Storage["NodeName"] = p
fd, err := os.Open(assembledFile)
if err != nil {
return errors.Wrap(err, "eos: error opening assembled file")
return errors.Wrap(err, "ocisfs: error opening assembled file")
}
defer fd.Close()
defer os.RemoveAll(assembledFile)
Expand Down Expand Up @@ -432,7 +446,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
Str("link", link).
Msg("ocisfs: child name link has wrong target id, repairing")

if err = os.Remove(childNameLink); err != nil {
if err = os.RemoveAll(childNameLink); err != nil {
return errors.Wrap(err, "ocisfs: could not remove symlink child entry")
}
}
Expand All @@ -443,7 +457,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
}

// only delete the upload if it was successfully written to the storage
if err = os.Remove(upload.infoPath); err != nil {
if err = os.RemoveAll(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
log.Err(err).Interface("info", upload.info).Msg("ocisfs: could not delete upload info")
return
Expand Down Expand Up @@ -474,12 +488,12 @@ func (fs *ocisfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUplo

// Terminate terminates the upload
func (upload *fileUpload) Terminate(ctx context.Context) error {
if err := os.Remove(upload.infoPath); err != nil {
if err := os.RemoveAll(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
return err
}
}
if err := os.Remove(upload.binPath); err != nil {
if err := os.RemoveAll(upload.binPath); err != nil {
if !os.IsNotExist(err) {
return err
}
Expand Down
24 changes: 19 additions & 5 deletions pkg/storage/fs/owncloud/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package owncloud

import (
"bytes"
"context"
"encoding/json"
"io"
Expand All @@ -44,7 +45,20 @@ var defaultFilePerm = os.FileMode(0664)
func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error {
upload, err := fs.GetUpload(ctx, ref.GetPath())
if err != nil {
return errors.Wrap(err, "ocfs: error retrieving upload")
// Upload corresponding to this ID was not found.
// Assume that this corresponds to the resource path to which the file has to be uploaded.
buf := &bytes.Buffer{}
length, err := io.Copy(buf, r)
if err != nil {
return err
}
uploadID, err := fs.InitiateUpload(ctx, ref, length, nil)
if err != nil {
return err
}
if upload, err = fs.GetUpload(ctx, uploadID); err != nil {
return errors.Wrap(err, "ocfs: error retrieving upload")
}
}

uploadInfo := upload.(*fileUpload)
Expand All @@ -69,7 +83,7 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl
uploadInfo.info.Storage["InternalDestination"] = p
fd, err := os.Open(assembledFile)
if err != nil {
return errors.Wrap(err, "eos: error opening assembled file")
return errors.Wrap(err, "ocfs: error opening assembled file")
}
defer fd.Close()
defer os.RemoveAll(assembledFile)
Expand Down Expand Up @@ -383,7 +397,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}

// only delete the upload if it was successfully written to the storage
if err := os.Remove(upload.infoPath); err != nil {
if err := os.RemoveAll(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
log.Err(err).Interface("info", upload.info).Msg("ocfs: could not delete upload info")
return err
Expand Down Expand Up @@ -412,12 +426,12 @@ func (fs *ocfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpload

// Terminate terminates the upload
func (upload *fileUpload) Terminate(ctx context.Context) error {
if err := os.Remove(upload.infoPath); err != nil {
if err := os.RemoveAll(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
ishank011 marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}
if err := os.Remove(upload.binPath); err != nil {
if err := os.RemoveAll(upload.binPath); err != nil {
if !os.IsNotExist(err) {
return err
}
Expand Down
30 changes: 22 additions & 8 deletions pkg/storage/utils/localfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package localfs

import (
"bytes"
"context"
"encoding/json"
"io"
Expand All @@ -42,15 +43,28 @@ var defaultFilePerm = os.FileMode(0664)
func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error {
upload, err := fs.GetUpload(ctx, ref.GetPath())
if err != nil {
return errors.Wrap(err, "ocisfs: error retrieving upload")
// Upload corresponding to this ID was not found.
// Assume that this corresponds to the resource path to which the file has to be uploaded.
buf := &bytes.Buffer{}
length, err := io.Copy(buf, r)
if err != nil {
return err
}
uploadID, err := fs.InitiateUpload(ctx, ref, length, nil)
if err != nil {
return err
}
if upload, err = fs.GetUpload(ctx, uploadID); err != nil {
return errors.Wrap(err, "localfs: error retrieving upload")
}
}

uploadInfo := upload.(*fileUpload)

p := uploadInfo.info.Storage["InternalDestination"]
ok, err := chunking.IsChunked(p)
if err != nil {
return errors.Wrap(err, "ocfs: error checking path")
return errors.Wrap(err, "localfs: error checking path")
}
if ok {
var assembledFile string
Expand All @@ -60,22 +74,22 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea
}
if p == "" {
if err = uploadInfo.Terminate(ctx); err != nil {
return errors.Wrap(err, "ocfs: error removing auxiliary files")
return errors.Wrap(err, "localfs: error removing auxiliary files")
}
return errtypes.PartialContent(ref.String())
}
uploadInfo.info.Storage["InternalDestination"] = p
fd, err := os.Open(assembledFile)
if err != nil {
return errors.Wrap(err, "eos: error opening assembled file")
return errors.Wrap(err, "localfs: error opening assembled file")
}
defer fd.Close()
defer os.RemoveAll(assembledFile)
r = fd
}

if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil {
return errors.Wrap(err, "ocisfs: error writing to binary file")
return errors.Wrap(err, "localfs: error writing to binary file")
}

return uploadInfo.FinishUpload(ctx)
Expand Down Expand Up @@ -330,7 +344,7 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
}

// only delete the upload if it was successfully written to the fs
if err := os.Remove(upload.infoPath); err != nil {
if err := os.RemoveAll(upload.infoPath); err != nil {
log := appctx.GetLogger(ctx)
log.Err(err).Interface("info", upload.info).Msg("localfs: could not delete upload info")
}
Expand All @@ -352,10 +366,10 @@ func (fs *localfs) AsTerminatableUpload(upload tusd.Upload) tusd.TerminatableUpl

// Terminate terminates the upload
func (upload *fileUpload) Terminate(ctx context.Context) error {
if err := os.Remove(upload.infoPath); err != nil {
if err := os.RemoveAll(upload.infoPath); err != nil {
return err
}
if err := os.Remove(upload.binPath); err != nil {
if err := os.RemoveAll(upload.binPath); err != nil {
return err
}
return nil
Expand Down