Skip to content

Commit

Permalink
[spaces 1/2] initial ListStorageSpaces implementation (#1802)
Browse files Browse the repository at this point in the history
  • Loading branch information
butonic authored Jul 8, 2021
1 parent 66996cb commit d2f72e0
Show file tree
Hide file tree
Showing 14 changed files with 470 additions and 34 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/list-spaces.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Introduce list spaces

The ListStorageSpaces call now allows listing all user homes and shared resources using a storage space id. The gateway will forward requests to a specific storage provider when a filter by id is given. Otherwise it will query all storage providers. Results will be deduplicated. Currently, only the decomposed fs storage driver implements the necessary logic to demonstrate the implmentation. A new `/dav/spaces` WebDAV endpoint to directly access a storage space is introduced in a separate PR.

https://github.com/cs3org/reva/pull/1802
https://github.com/cs3org/reva/pull/1803
127 changes: 110 additions & 17 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,37 +115,130 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag

func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
log := appctx.GetLogger(ctx)
// TODO: needs to be fixed
var id *provider.StorageSpaceId
for _, f := range req.Filters {
if f.Type == provider.ListStorageSpacesRequest_Filter_TYPE_ID {
id = f.GetId()
}
}
parts := strings.SplitN(id.OpaqueId, "!", 2)
if len(parts) != 2 {

var (
providers []*registry.ProviderInfo
err error
)
c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}

if id != nil {
// query that specific storage provider
parts := strings.SplitN(id.OpaqueId, "!", 2)
if len(parts) != 2 {
return &provider.ListStorageSpacesResponse{
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
}, nil
}
res, err := c.GetStorageProviders(ctx, &registry.GetStorageProvidersRequest{
Ref: &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + an opaqueid
OpaqueId: parts[1],
}},
})
if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "ListStorageSpaces filters: req "+req.String(), err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}
providers = res.Providers
} else {
// get list of all storage providers
res, err := c.ListStorageProviders(ctx, &registry.ListStorageProvidersRequest{})

if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error listing providers", err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}

providers = make([]*registry.ProviderInfo, 0, len(res.Providers))
// FIXME filter only providers that have an id set ... currently none have?
// bug? only ProviderPath is set
for i := range res.Providers {
// use only providers whose path does not start with a /?
if strings.HasPrefix(res.Providers[i].ProviderPath, "/") {
continue
}
providers = append(providers, res.Providers[i])
}
}

spacesFromProviders := make([][]*provider.StorageSpace, len(providers))
errors := make([]error, len(providers))

var wg sync.WaitGroup
for i, p := range providers {
wg.Add(1)
go s.listStorageSpacesOnProvider(ctx, req, &spacesFromProviders[i], p, &errors[i], &wg)
}
wg.Wait()

uniqueSpaces := map[string]*provider.StorageSpace{}
for i := range providers {
if errors[i] != nil {
if len(providers) > 1 {
log.Debug().Err(errors[i]).Msg("skipping provider")
continue
}
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error listing space", errors[i]),
}, nil
}
for j := range spacesFromProviders[i] {
uniqueSpaces[spacesFromProviders[i][j].Id.OpaqueId] = spacesFromProviders[i][j]
}
}
spaces := make([]*provider.StorageSpace, 0, len(uniqueSpaces))
for spaceID := range uniqueSpaces {
spaces = append(spaces, uniqueSpaces[spaceID])
}
if len(spaces) == 0 {
return &provider.ListStorageSpacesResponse{
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
Status: status.NewNotFound(ctx, "space not found"),
}, nil
}
c, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + a opaqueid
OpaqueId: parts[1],
}})

return &provider.ListStorageSpacesResponse{
Status: status.NewOK(ctx),
StorageSpaces: spaces,
}, nil
}

func (s *svc) listStorageSpacesOnProvider(ctx context.Context, req *provider.ListStorageSpacesRequest, res *[]*provider.StorageSpace, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error finding path", err),
}, nil
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

res, err := c.ListStorageSpaces(ctx, req)
r, err := c.ListStorageSpaces(ctx, req)
if err != nil {
log.Err(err).Msg("gateway: error listing storage space on storage provider")
return &provider.ListStorageSpacesResponse{
Status: status.NewInternal(ctx, err, "error calling ListStorageSpaces"),
}, nil
*e = errors.Wrap(err, "gateway: error calling ListStorageSpaces")
return
}
return res, nil

*res = r.StorageSpaces
}

func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
Expand Down
41 changes: 40 additions & 1 deletion internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,48 @@ func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateSt
}, nil
}

func hasNodeID(s *provider.StorageSpace) bool {
return s != nil && s.Root != nil && s.Root.OpaqueId != ""
}

func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
log := appctx.GetLogger(ctx)
spaces, err := s.storage.ListStorageSpaces(ctx, req.Filters)
if err != nil {
var st *rpc.Status
switch err.(type) {
case errtypes.IsNotFound:
st = status.NewNotFound(ctx, "not found when listing spaces")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
case errtypes.NotSupported:
st = status.NewUnimplemented(ctx, err, "not implemented")
default:
st = status.NewInternal(ctx, err, "error listing spaces")
}
return &provider.ListStorageSpacesResponse{
Status: st,
}, nil
}

for i := range spaces {
if hasNodeID(spaces[i]) {
// fill in storagespace id if it is not set
if spaces[i].Id == nil || spaces[i].Id.OpaqueId == "" {
spaces[i].Id = &provider.StorageSpaceId{OpaqueId: s.mountID + "!" + spaces[i].Root.OpaqueId}
}
// fill in storage id if it is not set
if spaces[i].Root.StorageId == "" {
spaces[i].Root.StorageId = s.mountID
}
} else if spaces[i].Id == nil || spaces[i].Id.OpaqueId == "" {
log.Warn().Str("service", "storageprovider").Str("driver", s.conf.Driver).Interface("space", spaces[i]).Msg("space is missing space id and root id")
}
}

return &provider.ListStorageSpacesResponse{
Status: status.NewUnimplemented(ctx, errtypes.NotSupported("ListStorageSpaces not implemented"), "ListStorageSpaces not implemented"),
Status: status.NewOK(ctx),
StorageSpaces: spaces,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/owncloud/owncloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2221,6 +2221,10 @@ func (fs *ocfs) RestoreRecycleItem(ctx context.Context, key string, restoreRef *
return fs.propagate(ctx, tgt)
}

func (fs *ocfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
return nil, errtypes.NotSupported("list storage spaces")
}

func (fs *ocfs) propagate(ctx context.Context, leafPath string) error {
var root string
if fs.c.EnableHome {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/fs/owncloudsql/owncloudsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,11 @@ func (fs *ocfs) HashFile(path string) (string, string, string, error) {
}
}

func (fs *ocfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
// TODO(corby): Implement
return nil, errtypes.NotSupported("list storage spaces")
}

func readChecksumIntoResourceChecksum(ctx context.Context, checksums, algo string, ri *provider.ResourceInfo) {
re := regexp.MustCompile(strings.ToUpper(algo) + `:(.*)`)
matches := re.FindStringSubmatch(checksums)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,7 @@ func (fs *s3FS) ListRecycle(ctx context.Context) ([]*provider.RecycleItem, error
func (fs *s3FS) RestoreRecycleItem(ctx context.Context, key string, restoreRef *provider.Reference) error {
return errtypes.NotSupported("restore recycle")
}

func (fs *s3FS) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
return nil, errtypes.NotSupported("list storage spaces")
}
29 changes: 18 additions & 11 deletions pkg/storage/registry/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,25 @@ func (b *reg) FindProviders(ctx context.Context, ref *provider.Reference) ([]*re

// If the reference has a resource id set, use it to route
if ref.ResourceId != nil {
for prefix, rule := range b.c.Rules {
addr := getProviderAddr(ctx, rule)
r, err := regexp.Compile("^" + prefix + "$")
if err != nil {
continue
if ref.ResourceId.StorageId != "" {
for prefix, rule := range b.c.Rules {
addr := getProviderAddr(ctx, rule)
r, err := regexp.Compile("^" + prefix + "$")
if err != nil {
continue
}
// TODO(labkode): fill path info based on provider id, if path and storage id points to same id, take that.
if m := r.FindString(ref.ResourceId.StorageId); m != "" {
return []*registrypb.ProviderInfo{{
ProviderId: ref.ResourceId.StorageId,
Address: addr,
}}, nil
}
}
// TODO(labkode): fill path info based on provider id, if path and storage id points to same id, take that.
if m := r.FindString(ref.ResourceId.StorageId); m != "" {
return []*registrypb.ProviderInfo{{
ProviderId: ref.ResourceId.StorageId,
Address: addr,
}}, nil
// TODO if the storage id is not set but node id is set we could poll all storage providers to check if the node is known there
// for now, say the reference is invalid
if ref.ResourceId.OpaqueId != "" {
return nil, errtypes.BadRequest("invalid reference " + ref.String())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type FS interface {
Shutdown(ctx context.Context) error
SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) error
UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) error
ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error)
}

// Registry is the interface that storage registries implement
Expand Down
Loading

0 comments on commit d2f72e0

Please sign in to comment.