Skip to content

Commit

Permalink
First working implementation of HTTP GET/PUT towards EOS
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabrizio Furano committed Feb 24, 2021
1 parent 58fb8dc commit a52cfc8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 41 deletions.
76 changes: 46 additions & 30 deletions pkg/eosclient/eosgrpc/eosgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/eosclient"
erpc "github.com/cs3org/reva/pkg/eosclient/eosgrpc/eos_grpc"
ehttp "github.com/cs3org/reva/pkg/eosclient/eosgrpc/eos_http"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage/utils/acl"
"github.com/google/uuid"
Expand All @@ -59,21 +59,13 @@ const (
// Options to configure the Client.
type Options struct {

// ForceSingleUserMode forces all connections to use only one user.
// This is the case when access to EOS is done from FUSE under apache or www-data.
ForceSingleUserMode bool

// UseKeyTabAuth changes will authenticate requests by using an EOS keytab.
UseKeytab bool

// Whether to maintain the same inode across various versions of a file.
// Requires extra metadata operations if set to true
VersionInvariant bool

// SingleUsername is the username to use when connecting to EOS.
// Defaults to apache
SingleUsername string

// Location of the xrdcopy binary.
// Default is /opt/eos/xrootd/bin/xrdcopy.
XrdcopyBinary string
Expand All @@ -99,12 +91,11 @@ type Options struct {
// SecProtocol is the comma separated list of security protocols used by xrootd.
// For example: "sss, unix"
SecProtocol string

httpopts ehttp.Options
}

func (opt *Options) init() {
if opt.ForceSingleUserMode && opt.SingleUsername != "" {
opt.SingleUsername = "apache"
}

if opt.XrdcopyBinary == "" {
opt.XrdcopyBinary = "/opt/eos/xrootd/bin/xrdcopy"
Expand All @@ -117,6 +108,10 @@ func (opt *Options) init() {
if opt.CacheDirectory == "" {
opt.CacheDirectory = os.TempDir()
}

opt.httpopts.Init()
opt.httpopts.BaseURL = opt.URL

}

// Client performs actions against a EOS management node (MGM)
Expand All @@ -126,6 +121,10 @@ type Client struct {
cl erpc.EosClient
}

func (c *Client) GetHttpCl() *ehttp.EosHttpClient {
return ehttp.New(&c.opt.httpopts)
}

// Create and connect a grpc eos Client
func newgrpc(ctx context.Context, opt *Options) (erpc.EosClient, error) {
log := appctx.GetLogger(ctx)
Expand Down Expand Up @@ -974,11 +973,24 @@ func (c *Client) Read(ctx context.Context, uid, gid, path string) (io.ReadCloser
localTarget := fmt.Sprintf("%s/%s", c.opt.CacheDirectory, rand)
defer os.RemoveAll(localTarget)

xrdPath := fmt.Sprintf("%s//%s", c.opt.URL, path)
cmd := exec.CommandContext(ctx, c.opt.XrdcopyBinary, "--nopbar", "--silent", "-f", xrdPath, localTarget, fmt.Sprintf("-OSeos.ruid=%s&eos.rgid=%s", uid, gid))
if _, _, err := c.execute(ctx, cmd); err != nil {
return nil, err
localfile, err := os.Create(localTarget)
if err != nil {
log.Error().Str("func", "Read").Str("path", path).Str("uid,gid", uid+","+gid).Str("err", err.Error()).Msg("")
return nil, errtypes.InternalError(fmt.Sprintf("can't open local cache file '%s'", localTarget))
}

// xrdPath := fmt.Sprintf("%s//%s", c.opt.URL, path)
// cmd := exec.CommandContext(ctx, c.opt.XrdcopyBinary, "--nopbar", "--silent", "-f", xrdPath, localTarget, fmt.Sprintf("-OSeos.ruid=%s&eos.rgid=%s", uid, gid))
// if _, _, err := c.execute(ctx, cmd); err != nil {
// return nil, err
// }

err = c.GetHttpCl().GETFile(ctx, "", uid, gid, path, localfile)
if err != nil {
log.Error().Str("func", "Read").Str("path", path).Str("uid,gid", uid+","+gid).Str("err", err.Error()).Msg("")
return nil, errtypes.InternalError(fmt.Sprintf("can't GET local cache file '%s'", localTarget))
}

return os.Open(localTarget)
}

Expand All @@ -987,24 +999,28 @@ func (c *Client) Write(ctx context.Context, uid, gid, path string, stream io.Rea
log := appctx.GetLogger(ctx)
log.Info().Str("func", "Write").Str("uid,gid", uid+","+gid).Str("path", path).Msg("")

fd, err := ioutil.TempFile(c.opt.CacheDirectory, "eoswrite-")
if err != nil {
return err
}
defer fd.Close()
defer os.RemoveAll(fd.Name())

// copy stream to local temp file
_, err = io.Copy(fd, stream)
if err != nil {
return err
}

return c.WriteFile(ctx, uid, gid, path, fd.Name())
//fd, err := ioutil.TempFile(c.opt.CacheDirectory, "eoswrite-")
//if err != nil {
// return err
// }
// defer fd.Close()
// defer os.RemoveAll(fd.Name())
//
// // copy stream to local temp file
// _, err = io.Copy(fd, stream)
// if err != nil {
//return err
//}

return c.GetHttpCl().PUTFile(ctx, "", uid, gid, path, stream)

//return c.GetHttpCl().PUTFile(ctx, remoteuser, uid, gid, urlpathng, stream)
//return c.WriteFile(ctx, uid, gid, path, fd.Name())
}

// WriteFile writes an existing file to the mgm
func (c *Client) WriteFile(ctx context.Context, uid, gid, path, source string) error {

log := appctx.GetLogger(ctx)
log.Info().Str("func", "WriteFile").Str("uid,gid", uid+","+gid).Str("path", path).Str("source", source).Msg("")

Expand Down
20 changes: 9 additions & 11 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,15 @@ func NewEOSFS(c *Config) (storage.FS, error) {
var eosClient eosclient.EOSClient
if c.UseGRPC {
eosClientOpts := &eosgrpc.Options{
XrdcopyBinary: c.XrdcopyBinary,
URL: c.MasterURL,
GrpcURI: c.GrpcURI,
CacheDirectory: c.CacheDirectory,
ForceSingleUserMode: c.ForceSingleUserMode,
SingleUsername: c.SingleUsername,
UseKeytab: c.UseKeytab,
Keytab: c.Keytab,
Authkey: c.GRPCAuthkey,
SecProtocol: c.SecProtocol,
VersionInvariant: c.VersionInvariant,
XrdcopyBinary: c.XrdcopyBinary,
URL: c.MasterURL,
GrpcURI: c.GrpcURI,
CacheDirectory: c.CacheDirectory,
UseKeytab: c.UseKeytab,
Keytab: c.Keytab,
Authkey: c.GRPCAuthkey,
SecProtocol: c.SecProtocol,
VersionInvariant: c.VersionInvariant,
}
eosClient = eosgrpc.New(eosClientOpts)
} else {
Expand Down

0 comments on commit a52cfc8

Please sign in to comment.