Skip to content

Commit

Permalink
only root nodes may have an owner
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Feb 18, 2022
1 parent c50c6b6 commit 0c50ef0
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 227 deletions.
10 changes: 3 additions & 7 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,12 @@ func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g
return err
}

owner, err := node.Owner()
if err != nil {
return err
}

owner := node.Owner()
// If the owner is empty and there are no grantees then we are dealing with a just created project space.
// In this case we don't need to check for permissions and just add the grant since this will be the project
// manager.
// When the owner is empty but grants are set then we do want to check the grants.
if !(len(grantees) == 0 && owner.OpaqueId == "") {
if !(len(grantees) == 0 && (owner == nil || owner.OpaqueId == "")) {
ok, err := fs.p.HasPermission(ctx, node, func(rp *provider.ResourcePermissions) bool {
// TODO remove AddGrant or UpdateGrant grant from CS3 api, redundant? tracked in https://github.com/cs3org/cs3apis/issues/92
return rp.AddGrant || rp.UpdateGrant
Expand All @@ -91,7 +87,7 @@ func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g

// when a grant is added to a space, do not add a new space under "shares"
if spaceGrant := ctx.Value(utils.SpaceGrant); spaceGrant == nil {
err := fs.createStorageSpace(ctx, spaceTypeShare, node.ID)
err := fs.linkStorageSpaceType(ctx, spaceTypeShare, node.ID)
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/storage/utils/decomposedfs/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ func (lu *Lookup) NodeFromID(ctx context.Context, id *provider.ResourceId) (n *n
// The Resource references the root of a space
return lu.NodeFromSpaceID(ctx, id)
}
n, err = node.ReadNode(ctx, lu, id.StorageId, id.OpaqueId)
if err != nil {
return nil, err
}

return n, n.FindStorageSpaceRoot()
return node.ReadNode(ctx, lu, id.StorageId, id.OpaqueId)
}

// Pathify segments the beginning of a string into depth segments of width length
Expand Down
175 changes: 99 additions & 76 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ func New(spaceID, id, parentID, name string, blobsize int64, blobID string, owne

// ChangeOwner sets the owner of n to newOwner
func (n *Node) ChangeOwner(new *userpb.UserId) (err error) {
nodePath := n.InternalPath()
n.owner = new
rootNodePath := n.SpaceRoot.InternalPath()
n.SpaceRoot.owner = new

var attribs = map[string]string{xattrs.OwnerIDAttr: new.OpaqueId,
xattrs.OwnerIDPAttr: new.Idp,
xattrs.OwnerTypeAttr: utils.UserTypeToString(new.Type)}

if err := xattrs.SetMultiple(nodePath, attribs); err != nil {
if err := xattrs.SetMultiple(rootNodePath, attribs); err != nil {
return err
}

Expand Down Expand Up @@ -158,7 +158,7 @@ func (n *Node) GetMetadata(key string) (val string, err error) {
}

// WriteAllNodeMetadata writes the Node metadata to disk
func (n *Node) WriteAllNodeMetadata(owner *userpb.UserId) (err error) {
func (n *Node) WriteAllNodeMetadata() (err error) {
attribs := make(map[string]string)

attribs[xattrs.ParentidAttr] = n.ParentID
Expand All @@ -167,55 +167,64 @@ func (n *Node) WriteAllNodeMetadata(owner *userpb.UserId) (err error) {
attribs[xattrs.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10)

nodePath := n.InternalPath()
attribs[xattrs.OwnerIDAttr] = ""
attribs[xattrs.OwnerIDPAttr] = ""
attribs[xattrs.OwnerTypeAttr] = ""
return xattrs.SetMultiple(nodePath, attribs)
}

if owner != nil {
attribs[xattrs.OwnerIDAttr] = owner.OpaqueId
attribs[xattrs.OwnerIDPAttr] = owner.Idp
attribs[xattrs.OwnerTypeAttr] = utils.UserTypeToString(owner.Type)
func (n *Node) WriteOwner(owner *userpb.UserId) error {
attribs := map[string]string{
xattrs.OwnerIDAttr: owner.OpaqueId,
xattrs.OwnerIDPAttr: owner.Idp,
xattrs.OwnerTypeAttr: utils.UserTypeToString(owner.Type),
}
if err := xattrs.SetMultiple(nodePath, attribs); err != nil {
return err
}
return
nodeRootPath := n.SpaceRoot.InternalPath()
return xattrs.SetMultiple(nodeRootPath, attribs)

}

// ReadNode creates a new instance from an id and checks if it exists
func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string) (n *Node, err error) {
n = &Node{

// read space root
r := &Node{
SpaceID: spaceID,
lu: lu,
ID: nodeID,
ID: spaceID,
}

nodePath := n.InternalPath()

// lookup parent id in extended attributes
var attr string
attr, err = xattrs.Get(nodePath, xattrs.ParentidAttr)
r.SpaceRoot = r
r.owner, err = r.readOwner()
switch {
case err == nil:
n.ParentID = attr
case xattrs.IsAttrUnset(err):
return nil, errtypes.InternalError(err.Error())
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
default:
return nil, errtypes.InternalError(err.Error())
return r, nil
case err != nil:
return nil, err
}
r.Exists = true

// check if this is a space root
if _, err = xattrs.Get(nodePath, xattrs.SpaceNameAttr); err == nil {
n.SpaceRoot = n
if spaceID == nodeID {
return r, nil
}

// read node
n = &Node{
SpaceID: spaceID,
lu: lu,
ID: nodeID,
SpaceRoot: r,
}

nodePath := n.InternalPath()

var attr string
// lookup name in extended attributes
if attr, err = xattrs.Get(nodePath, xattrs.NameAttr); err == nil {
n.Name = attr
} else {
return
}

n.Exists = true

// lookup blobID in extended attributes
if attr, err = xattrs.Get(nodePath, xattrs.BlobIDAttr); err == nil {
n.BlobID = attr
Expand All @@ -230,6 +239,40 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string) (n *No
return
}

// lookup parent id in extended attributes
attr, err = xattrs.Get(nodePath, xattrs.ParentidAttr)
switch {
case err == nil:
n.ParentID = attr
case xattrs.IsAttrUnset(err):
return nil, errtypes.InternalError(err.Error())
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
default:
return nil, errtypes.InternalError(err.Error())
}

// TODO why do we stat the parent? to determine if the current node is in the trash we would need to traverse all parents...
// we need to traverse all parents for permissions anyway ...
// - we can compare to space root owner with the current user
// - we can compare the share permissions on the root for spaces, which would work for managers
// - for non managers / owners we need to traverse all path segments because an intermediate node might have been shared
// - if we want to support negative acls we need to traverse the path for all users (but the owner)
// for trashed items we need to check all parents
// - one of them might have the trash suffix ...
// - options:
// - move deleted nodes in a trash folder that is still part of the tree (aka freedesktop org trash spec)
// - shares should still be removed, which requires traversing all trashed children ... and it should be undoable ...
// - what if a trashed file is restored? will child items be accessible by a share?
// - compare paths of trash root items and the trashed file?
// - to determine the relative path of a file we would need to traverse all intermediate nodes anyway
// - recursively mark all children as trashed ... async ... it is ok when that is not synchronous
// - how do we pick up if an error occurs? write a journal somewhere? activity log / delta?
// - stat requests will not pick up trashed items at all
// - recursively move all children into the trash folder?
// - no need to write an additional trash entry
// - can be made more robust with a journal
// - same recursion mechanism can be used to purge items? sth we still need to do
_, err = os.Stat(n.ParentInternalPath())
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -238,9 +281,6 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string) (n *No
return nil, err
}

// FIXME always set SpaceRoot? what about shares?

n.Exists = true
return
}

Expand Down Expand Up @@ -333,59 +373,58 @@ func (n *Node) Parent() (p *Node, err error) {
return
}

// Owner returns the cached owner id or reads it from the extended attributes
// TODO can be private as only the AsResourceInfo uses it
func (n *Node) Owner() (*userpb.UserId, error) {
if n.owner != nil {
return n.owner, nil
}
func (n *Node) Owner() *userpb.UserId {
return n.SpaceRoot.owner
}

// readOwner reads the owner from the extended attributes of the space root
// in case either owner id or owner idp are unset we return an error and an empty owner object
func (n *Node) readOwner() (*userpb.UserId, error) {

owner := &userpb.UserId{}

// FIXME ... do we return the owner of the reference or the owner of the target?
// we don't really know the owner of the target ... and as the reference may point anywhere we cannot really find out
// but what are the permissions? all? none? the gateway has to fill in?
// TODO what if this is a reference?
nodePath := n.InternalPath()
rootNodePath := n.SpaceRoot.InternalPath()
// lookup parent id in extended attributes
var attr string
var err error
// lookup ID in extended attributes
attr, err = xattrs.Get(nodePath, xattrs.OwnerIDAttr)
attr, err = xattrs.Get(rootNodePath, xattrs.OwnerIDAttr)
switch {
case err == nil:
owner.OpaqueId = attr
case xattrs.IsAttrUnset(err), xattrs.IsNotExist(err):
fallthrough
case xattrs.IsAttrUnset(err):
err = nil
default:
return nil, err
}

// lookup IDP in extended attributes
attr, err = xattrs.Get(nodePath, xattrs.OwnerIDPAttr)
attr, err = xattrs.Get(rootNodePath, xattrs.OwnerIDPAttr)
switch {
case err == nil:
owner.Idp = attr
case xattrs.IsAttrUnset(err), xattrs.IsNotExist(err):
fallthrough
err = nil
default:
return nil, err
}

// lookup type in extended attributes
attr, err = xattrs.Get(nodePath, xattrs.OwnerTypeAttr)
attr, err = xattrs.Get(rootNodePath, xattrs.OwnerTypeAttr)
switch {
case err == nil:
owner.Type = utils.UserTypeMap(attr)
case xattrs.IsAttrUnset(err), xattrs.IsNotExist(err):
fallthrough
default:
// TODO the user type defaults to invalid, which is the case
err = nil
default:
return nil, err
}

n.owner = owner
return n.owner, err
// owner is an optional property
if owner.Idp == "" && owner.OpaqueId == "" {
return nil, nil
}
return owner, nil
}

// PermissionSet returns the permission set for the current user
Expand All @@ -396,7 +435,7 @@ func (n *Node) PermissionSet(ctx context.Context) provider.ResourcePermissions {
appctx.GetLogger(ctx).Debug().Interface("node", n).Msg("no user in context, returning default permissions")
return NoPermissions()
}
if o, _ := n.Owner(); utils.UserEqual(u.Id, o) {
if utils.UserEqual(u.Id, n.SpaceRoot.Owner()) {
return OwnerPermissions()
}
// read the permissions for the current user from the acls of the current node
Expand Down Expand Up @@ -566,6 +605,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi
Size: uint64(n.Blobsize),
Target: target,
PermissionSet: rp,
Owner: n.Owner(),
}

if nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER {
Expand All @@ -578,10 +618,6 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi
}
}

if ri.Owner, err = n.Owner(); err != nil {
sublog.Debug().Err(err).Msg("could not determine owner")
}

// TODO make etag of files use fileid and checksum

var tmTime time.Time
Expand Down Expand Up @@ -828,20 +864,7 @@ func (n *Node) UnsetTempEtag() (err error) {
// ReadUserPermissions will assemble the permissions for the current user on the given node without parent nodes
func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap provider.ResourcePermissions, err error) {
// check if the current user is the owner
o, err := n.Owner()
if err != nil {
// TODO check if a parent folder has the owner set?
appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Msg("could not determine owner, returning default permissions")
return NoPermissions(), err
}
if o.OpaqueId == "" {
// this happens for root nodes and project spaces in the storage. the extended attributes are set to emptystring to indicate: no owner
// for project spaces we need to go over the grants and check the granted permissions
if n.ID == RootID {
return NoOwnerPermissions(), nil
}
}
if utils.UserEqual(u.Id, o) {
if utils.UserEqual(u.Id, n.Owner()) {
appctx.GetLogger(ctx).Debug().Str("node", n.ID).Msg("user is owner, returning owner permissions")
return OwnerPermissions(), nil
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/storage/utils/decomposedfs/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/storage/utils/decomposedfs/node"
helpers "github.com/cs3org/reva/pkg/storage/utils/decomposedfs/testhelpers"
Expand Down Expand Up @@ -91,13 +90,8 @@ var _ = Describe("Node", func() {
n.Name = "TestName"
n.BlobID = "TestBlobID"
n.Blobsize = int64(blobsize)
owner := &userpb.UserId{
Idp: "testidp",
OpaqueId: "testuserid",
Type: userpb.UserType_USER_TYPE_PRIMARY,
}

err = n.WriteAllNodeMetadata(owner)
err = n.WriteAllNodeMetadata()
Expect(err).ToNot(HaveOccurred())
n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())
Expand Down
Loading

0 comments on commit 0c50ef0

Please sign in to comment.