Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datatx createtransfershare #1725

Merged
merged 10 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/datatx-createtransfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Create transfer type share

`transfer-create` creates a share of type transfer.

https://github.com/cs3org/reva/pull/1725
4 changes: 2 additions & 2 deletions cmd/reva/ocm-share-list-received.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func ocmShareListReceivedCommand() *command {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"#", "Owner.Idp", "Owner.OpaqueId", "ResourceId", "Permissions", "Type",
"Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated", "State"})
"Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated", "State", "ShareType"})
for _, s := range shareRes.Shares {
t.AppendRows([]table.Row{
{s.Share.Id.OpaqueId, s.Share.Owner.Idp, s.Share.Owner.OpaqueId, s.Share.ResourceId.String(),
s.Share.Permissions.String(), s.Share.Grantee.Type.String(), s.Share.Grantee.GetUserId().Idp,
s.Share.Grantee.GetUserId().OpaqueId, time.Unix(int64(s.Share.Ctime.Seconds), 0),
time.Unix(int64(s.Share.Mtime.Seconds), 0), s.State.String()},
time.Unix(int64(s.Share.Mtime.Seconds), 0), s.State.String(), s.Share.ShareType.String()},
})
}
t.Render()
Expand Down
4 changes: 2 additions & 2 deletions cmd/reva/ocm-share-list.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func ocmShareListCommand() *command {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"#", "Owner.Idp", "Owner.OpaqueId", "ResourceId", "Permissions", "Type",
"Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated"})
"ShareType", "Grantee.Idp", "Grantee.OpaqueId", "Created", "Updated"})

for _, s := range shareRes.Shares {
t.AppendRows([]table.Row{
{s.Id.OpaqueId, s.Owner.Idp, s.Owner.OpaqueId, s.ResourceId.String(), s.Permissions.String(),
s.Grantee.Type.String(), s.Grantee.GetUserId().Idp, s.Grantee.GetUserId().OpaqueId,
s.Grantee.Type.String(), s.ShareType.String(), s.Grantee.GetUserId().Idp, s.Grantee.GetUserId().OpaqueId,
time.Unix(int64(s.Ctime.Seconds), 0), time.Unix(int64(s.Mtime.Seconds), 0)},
})
}
Expand Down
129 changes: 120 additions & 9 deletions cmd/reva/transfer-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,153 @@
package main

import (
"errors"
"io"
"os"
"strconv"
"strings"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
invitepb "github.com/cs3org/go-cs3apis/cs3/ocm/invite/v1beta1"
ocmprovider "github.com/cs3org/go-cs3apis/cs3/ocm/provider/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
tx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/jedib0t/go-pretty/table"
"github.com/pkg/errors"
)

func transferCreateCommand() *command {
cmd := newCommand("transfer-create")
cmd.Description = func() string { return "create transfer between 2 remotes" }
cmd.Usage = func() string { return "Usage: transfer-create [-flags]" }
cmd.Description = func() string { return "create transfer between 2 sites" }
cmd.Usage = func() string { return "Usage: transfer-create [-flags] <path>" }
grantee := cmd.String("grantee", "", "the grantee, receiver of the transfer")
granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group")
idp := cmd.String("idp", "", "the idp of the grantee, default to same idp as the user triggering the action")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if cmd.NArg() < 1 {
return errors.New("Invalid arguments: " + cmd.Usage())
}

if *grantee == "" {
return errors.New("Grantee cannot be empty: use -grantee flag\n" + cmd.Usage())
}
if *idp == "" {
return errors.New("Idp cannot be empty: use -idp flag\n" + cmd.Usage())
}

// the resource to transfer; the path
fn := cmd.Args()[0]

ctx := getAuthContext()
client, err := getClient()
if err != nil {
return err
}

transferRequest := &tx.CreateTransferRequest{}
// check if invitation has been accepted
acceptedUserRes, err := client.GetAcceptedUser(ctx, &invitepb.GetAcceptedUserRequest{
RemoteUserId: &userpb.UserId{OpaqueId: *grantee, Idp: *idp},
})
if err != nil {
return err
}
if acceptedUserRes.Status.Code != rpc.Code_CODE_OK {
return formatError(acceptedUserRes.Status)
}

transferResponse, err := client.CreateTransfer(ctx, transferRequest)
// verify resource stats
statReq := &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: fn,
},
},
}
statRes, err := client.Stat(ctx, statReq)
if err != nil {
return err
}
if transferResponse.Status.Code != rpc.Code_CODE_OK {
return formatError(transferResponse.Status)
if statRes.Status.Code != rpc.Code_CODE_OK {
return formatError(statRes.Status)
}

providerInfoResp, err := client.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{
Domain: *idp,
})
if err != nil {
return err
}

resourcePermissions, pint, err := getOCMSharePerm(editorPermission)
if err != nil {
return err
}

gt := provider.GranteeType_GRANTEE_TYPE_USER
if strings.ToLower(*granteeType) == "group" {
gt = provider.GranteeType_GRANTEE_TYPE_GROUP
}

createShareReq := &ocm.CreateOCMShareRequest{
Opaque: &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"permissions": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte(strconv.Itoa(pint)),
},
"name": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte(statRes.Info.Path),
},
"protocol": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte("datatx"),
},
},
},
ResourceId: statRes.Info.Id,
Grant: &ocm.ShareGrant{
Grantee: &provider.Grantee{
Type: gt,
Id: &provider.Grantee_UserId{
UserId: &userpb.UserId{
Idp: *idp,
OpaqueId: *grantee,
},
},
},
Permissions: resourcePermissions,
},
RecipientMeshProvider: providerInfoResp.ProviderInfo,
}

createShareResponse, err := client.CreateOCMShare(ctx, createShareReq)
if err != nil {
return err
}
if createShareResponse.Status.Code != rpc.Code_CODE_OK {
if createShareResponse.Status.Code == rpc.Code_CODE_NOT_FOUND {
return formatError(statRes.Status)
}
return err
}

t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"#", "Owner.Idp", "Owner.OpaqueId", "ResourceId", "Permissions", "Type", "Grantee.Idp", "Grantee.OpaqueId", "ShareType", "Created", "Updated"})

s := createShareResponse.Share
t.AppendRows([]table.Row{
{s.Id.OpaqueId, s.Owner.Idp, s.Owner.OpaqueId, s.ResourceId.String(), s.Permissions.String(),
s.Grantee.Type.String(), s.Grantee.GetUserId().Idp, s.Grantee.GetUserId().OpaqueId, s.ShareType.String(),
time.Unix(int64(s.Ctime.Seconds), 0), time.Unix(int64(s.Mtime.Seconds), 0)},
})
t.Render()
return nil
}

return cmd
}
13 changes: 9 additions & 4 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ type config struct {
TransferExpires int64 `mapstructure:"transfer_expires"`
TokenManager string `mapstructure:"token_manager"`
// ShareFolder is the location where to create shares in the recipient's storage provider.
ShareFolder string `mapstructure:"share_folder"`
HomeMapping string `mapstructure:"home_mapping"`
TokenManagers map[string]map[string]interface{} `mapstructure:"token_managers"`
EtagCacheTTL int `mapstructure:"etag_cache_ttl"`
ShareFolder string `mapstructure:"share_folder"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
HomeMapping string `mapstructure:"home_mapping"`
TokenManagers map[string]map[string]interface{} `mapstructure:"token_managers"`
EtagCacheTTL int `mapstructure:"etag_cache_ttl"`
}

// sets defaults
Expand All @@ -77,6 +78,10 @@ func (c *config) init() {

c.ShareFolder = strings.Trim(c.ShareFolder, "/")

if c.DataTransfersFolder == "" {
c.DataTransfersFolder = "Data-Transfers"
}

if c.TokenManager == "" {
c.TokenManager = "jwt"
}
Expand Down
44 changes: 33 additions & 11 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
panic("gateway: error updating a received share: the share is nil")
}

createRefStatus, err := s.createWebdavReference(ctx, share.Share)
createRefStatus, err := s.createOCMReference(ctx, share.Share)
return &ocm.UpdateReceivedOCMShareResponse{
Status: createRefStatus,
}, err
Expand Down Expand Up @@ -291,7 +291,7 @@ func (s *svc) GetReceivedOCMShare(ctx context.Context, req *ocm.GetReceivedOCMSh
return res, nil
}

func (s *svc) createWebdavReference(ctx context.Context, share *ocm.Share) (*rpc.Status, error) {
func (s *svc) createOCMReference(ctx context.Context, share *ocm.Share) (*rpc.Status, error) {

log := appctx.GetLogger(ctx)

Expand All @@ -314,17 +314,39 @@ func (s *svc) createWebdavReference(ctx context.Context, share *ocm.Share) (*rpc
return status.NewInternal(ctx, err, "error updating received share"), nil
}

// reference path is the home path + some name on the corresponding
// mesh provider (/home/MyShares/x)
// It is the responsibility of the gateway to resolve these references and merge the response back
// from the main request.
refPath := path.Join(homeRes.Path, s.c.ShareFolder, path.Base(share.Name))
log.Info().Msg("mount path will be:" + refPath)
var refPath, targetURI string
if share.ShareType == ocm.Share_SHARE_TYPE_TRANSFER {
createTransferDir, err := s.CreateContainer(ctx, &provider.CreateContainerRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: path.Join(homeRes.Path, s.c.DataTransfersFolder),
},
},
})
if err != nil {
return status.NewInternal(ctx, err, "error creating transfers directory"), nil
}
if createTransferDir.Status.Code != rpc.Code_CODE_OK && createTransferDir.Status.Code != rpc.Code_CODE_ALREADY_EXISTS {
err := status.NewErrorFromCode(createTransferDir.Status.GetCode(), "gateway")
return status.NewInternal(ctx, err, "error creating transfers directory"), nil
}

createRefReq := &provider.CreateReferenceRequest{
Path: refPath,
refPath = path.Join(homeRes.Path, s.c.DataTransfersFolder, path.Base(share.Name))
targetURI = fmt.Sprintf("datatx://%s@%s?name=%s", token, share.Creator.Idp, share.Name)
} else {
// reference path is the home path + some name on the corresponding
// mesh provider (/home/MyShares/x)
// It is the responsibility of the gateway to resolve these references and merge the response back
// from the main request.
refPath = path.Join(homeRes.Path, s.c.ShareFolder, path.Base(share.Name))
// webdav is the scheme, token@host the opaque part and the share name the query of the URL.
TargetUri: fmt.Sprintf("webdav://%s@%s?name=%s", token, share.Creator.Idp, share.Name),
targetURI = fmt.Sprintf("webdav://%s@%s?name=%s", token, share.Creator.Idp, share.Name)
}

log.Info().Msg("mount path will be:" + refPath)
createRefReq := &provider.CreateReferenceRequest{
Path: refPath,
TargetUri: targetURI,
}

c, err := s.findByPath(ctx, refPath)
Expand Down
10 changes: 9 additions & 1 deletion internal/grpc/services/ocmcore/ocmcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ func (s *service) CreateOCMCoreShare(ctx context.Context, req *ocmcore.CreateOCM
},
}

share, err := s.sm.Share(ctx, resource, grant, req.Name, nil, "", req.Owner, token, ocm.Share_SHARE_TYPE_REGULAR)
var shareType ocm.Share_ShareType
switch req.Protocol.Name {
case "datatx":
shareType = ocm.Share_SHARE_TYPE_TRANSFER
default:
shareType = ocm.Share_SHARE_TYPE_REGULAR
}

share, err := s.sm.Share(ctx, resource, grant, req.Name, nil, "", req.Owner, token, shareType)
if err != nil {
return &ocmcore.CreateOCMCoreShareResponse{
Status: status.NewInternal(ctx, err, "error creating ocm core share"),
Expand Down
19 changes: 18 additions & 1 deletion internal/grpc/services/ocmshareprovider/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,24 @@ func (s *service) CreateOCMShare(ctx context.Context, req *ocm.CreateOCMShareReq
}, nil
}

share, err := s.sm.Share(ctx, req.ResourceId, req.Grant, name, req.RecipientMeshProvider, permissions, nil, "", ocm.Share_SHARE_TYPE_REGULAR)
// discover share type
sharetype := ocm.Share_SHARE_TYPE_REGULAR
protocol, ok := req.Opaque.Map["protocol"]
if ok {
switch protocol.Decoder {
case "plain":
if string(protocol.Value) == "datatx" {
sharetype = ocm.Share_SHARE_TYPE_TRANSFER
}
default:
err := errors.New("protocol decoder not recognized")
return &ocm.CreateOCMShareResponse{
Status: status.NewInternal(ctx, err, "error creating share"),
}, nil
}
}

share, err := s.sm.Share(ctx, req.ResourceId, req.Grant, name, req.RecipientMeshProvider, permissions, nil, "", sharetype)
if err != nil {
return &ocm.CreateOCMShareResponse{
Status: status.NewInternal(ctx, err, "error creating share"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s *service) CreateContainer(ctx context.Context, req *provider.CreateConta
case errtypes.IsNotFound:
st = status.NewNotFound(ctx, "path not found when creating container")
case errtypes.AlreadyExists:
st = status.NewInternal(ctx, err, "error: container already exists")
st = status.NewAlreadyExists(ctx, err, "container already exists")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
default:
Expand Down
44 changes: 32 additions & 12 deletions pkg/ocm/share/manager/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,40 @@ func (m *mgr) Share(ctx context.Context, md *provider.ResourceId, g *ocm.ShareGr
}

if isOwnersMeshProvider {
token, ok := tokenpkg.ContextGetToken(ctx)
if !ok {
return nil, errors.New("Could not get token from context")
}
var protocol []byte
if st == ocm.Share_SHARE_TYPE_TRANSFER {
protocol, err = json.Marshal(
map[string]interface{}{
"name": "datatx",
"options": map[string]string{
"permissions": pm,
"token": token,
},
},
)
if err != nil {
err = errors.Wrap(err, "error marshalling protocol data")
return nil, err
}

// Call the remote provider's CreateOCMCoreShare method
protocol, err := json.Marshal(
map[string]interface{}{
"name": "webdav",
"options": map[string]string{
"permissions": pm,
"token": tokenpkg.ContextMustGetToken(ctx),
} else {
protocol, err = json.Marshal(
map[string]interface{}{
"name": "webdav",
"options": map[string]string{
"permissions": pm,
"token": tokenpkg.ContextMustGetToken(ctx),
},
},
},
)
if err != nil {
err = errors.Wrap(err, "error marshalling protocol data")
return nil, err
)
if err != nil {
err = errors.Wrap(err, "error marshalling protocol data")
return nil, err
}
}

requestBody := url.Values{
Expand Down
Loading