Skip to content

Commit

Permalink
Added support for HTTP TPC
Browse files Browse the repository at this point in the history
  • Loading branch information
dynamic-entropy committed Aug 19, 2021
1 parent d3a946e commit 56b2105
Show file tree
Hide file tree
Showing 3 changed files with 482 additions and 34 deletions.
10 changes: 10 additions & 0 deletions changelog/unreleased/http-tpc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Enhancement: Add support for HTTP TPC

We have added support for Http Third Party Copy.
This allows remote data transfer between storages managed by:

1. Two different reva servers
2. A reva server and a grid site server

https://github.com/cs3org/reva/issues/1787
https://github.com/cs3org/reva/pull/2007
79 changes: 45 additions & 34 deletions internal/http/services/owncloud/ocdav/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,52 +52,63 @@ func (s *svc) handlePathCopy(w http.ResponseWriter, r *http.Request, ns string)
ctx, span := rtrace.Provider.Tracer("reva").Start(r.Context(), "copy")
defer span.End()

src := path.Join(ns, r.URL.Path)
dst, err := extractDestination(r)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
for _, r := range nameRules {
if !r.Test(dst) {
// If is a third party copy
if r.Header.Get("ThirdPartyCopy") == "T" {

if r.Header.Get("Source") == "" { //Push Mode
s.handleTPCPush(ctx, w, r, ns)
} else { // Pull Mode
s.handleTPCPull(ctx, w, r, ns)
}
} else { // is a local copy
src := path.Join(ns, r.URL.Path)
dst, err := extractDestination(r)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
}
dst = path.Join(ns, dst)

sublog := appctx.GetLogger(ctx).With().Str("src", src).Str("dst", dst).Logger()
for _, r := range nameRules {
if !r.Test(dst) {
w.WriteHeader(http.StatusBadRequest)
return
}
}

srcRef := &provider.Reference{Path: src}
dst = path.Join(ns, dst)

// check dst exists
dstRef := &provider.Reference{Path: dst}
sublog := appctx.GetLogger(ctx).With().Str("src", src).Str("dst", dst).Logger()

intermediateDirRefFunc := func() (*provider.Reference, *rpc.Status, error) {
intermediateDir := path.Dir(dst)
ref := &provider.Reference{Path: intermediateDir}
return ref, &rpc.Status{Code: rpc.Code_CODE_OK}, nil
}
srcRef := &provider.Reference{Path: src}

cp := s.prepareCopy(ctx, w, r, srcRef, dstRef, intermediateDirRefFunc, &sublog)
if cp == nil {
return
}
// check dst exists
dstRef := &provider.Reference{Path: dst}

client, err := s.getClient()
if err != nil {
sublog.Error().Err(err).Msg("error getting grpc client")
w.WriteHeader(http.StatusInternalServerError)
return
}
intermediateDirRefFunc := func() (*provider.Reference, *rpc.Status, error) {
intermediateDir := path.Dir(dst)
ref := &provider.Reference{Path: intermediateDir}
return ref, &rpc.Status{Code: rpc.Code_CODE_OK}, nil
}

if err := s.executePathCopy(ctx, client, w, r, cp); err != nil {
sublog.Error().Err(err).Str("depth", cp.depth).Msg("error executing path copy")
w.WriteHeader(http.StatusInternalServerError)
cp := s.prepareCopy(ctx, w, r, srcRef, dstRef, intermediateDirRefFunc, &sublog)
if cp == nil {
return
}

client, err := s.getClient()
if err != nil {
sublog.Error().Err(err).Msg("error getting grpc client")
w.WriteHeader(http.StatusInternalServerError)
return
}

if err := s.executePathCopy(ctx, client, w, r, cp); err != nil {
sublog.Error().Err(err).Str("depth", cp.depth).Msg("error executing path copy")
w.WriteHeader(http.StatusInternalServerError)
}
w.WriteHeader(cp.successCode)
}
w.WriteHeader(cp.successCode)
}

func (s *svc) executePathCopy(ctx context.Context, client gateway.GatewayAPIClient, w http.ResponseWriter, r *http.Request, cp *copy) error {
log := appctx.GetLogger(ctx)
log.Debug().Str("src", cp.sourceInfo.Path).Str("dst", cp.destination.Path).Msg("descending")
Expand Down
Loading

0 comments on commit 56b2105

Please sign in to comment.