From de2be632f2e8a11faac733adba872ea2f342cb27 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Mon, 17 Oct 2016 14:26:14 -0700 Subject: [PATCH] Adding in code code allowing generic shared file system as an input --- bin/tes-runner.py | 15 ++++---- src/tes-worker/worker.go | 9 +++++ src/tes/server/task_boltdb.go | 10 ++++++ src/tes/worker/docker.go | 8 +++-- src/tes/worker/file_client.go | 62 ++++++++++++++++++++++++++++++++++ src/tes/worker/file_mapper.go | 4 +-- src/tes/worker/fs_client.go | 10 ++++-- src/tes/worker/swift_client.go | 49 +++++++++++++++------------ 8 files changed, 132 insertions(+), 35 deletions(-) create mode 100644 src/tes/worker/file_client.go diff --git a/bin/tes-runner.py b/bin/tes-runner.py index 25b9347..5bbcbd9 100755 --- a/bin/tes-runner.py +++ b/bin/tes-runner.py @@ -2,7 +2,7 @@ import json import time -import requests +import urllib import argparse if __name__ == "__main__": @@ -14,18 +14,19 @@ with open(args.task) as handle: task = json.loads(handle.read()) - r = requests.post("%s/v1/jobs" % (args.server), json=task) - data = r.json() + u = urllib.urlopen("%s/v1/jobs" % (args.server), json.dumps(task)) + data = json.loads(u.read()) + print data job_id = data['value'] - for i in range(10): - r = requests.get("%s/v1/jobs/%s" % (args.server, job_id)) - data = r.json() + while True: + r = urllib.urlopen("%s/v1/jobs/%s" % (args.server, job_id)) + data = json.loads(r.read()) if data["state"] not in ['Queued', "Running"]: break time.sleep(1) - print data + print json.dumps(data, indent=4) diff --git a/src/tes-worker/worker.go b/src/tes-worker/worker.go index b78f036..76c7d78 100644 --- a/src/tes-worker/worker.go +++ b/src/tes-worker/worker.go @@ -6,6 +6,7 @@ import ( "google.golang.org/grpc" "log" "os" + "strings" "path/filepath" "tes/server/proto" "tes/worker" @@ -16,6 +17,7 @@ func main() { agro_server := flag.String("master", "localhost:9090", "Master Server") volume_dir_arg := flag.String("volumes", "volumes", "Volume Dir") storage_dir_arg := flag.String("storage", "storage", "Storage Dir") + file_system_arg := flag.String("files", "", "Allowed File Paths") swift_dir_arg := flag.String("swift", "", "Cache Swift items in directory") timeout_arg := flag.Int("timeout", -1, "Timeout in seconds") @@ -43,6 +45,13 @@ func main() { } file_client = tes_taskengine_worker.NewSwiftAccess() + } else if *file_system_arg != "" { + o := []string{} + for _, i := range strings.Split(*file_system_arg, ",") { + p, _ := filepath.Abs(i) + o = append(o, p) + } + file_client = tes_taskengine_worker.NewFileAccess(o) } else { storage_dir, _ := filepath.Abs(*storage_dir_arg) if _, err := os.Stat(storage_dir); os.IsNotExist(err) { diff --git a/src/tes/server/task_boltdb.go b/src/tes/server/task_boltdb.go index 1d78ade..0b44a04 100644 --- a/src/tes/server/task_boltdb.go +++ b/src/tes/server/task_boltdb.go @@ -71,6 +71,16 @@ func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) ( if !disk_found { return nil, fmt.Errorf("Required volume '%s' not found in resources", input.Path) } + //Fixing blank value to File by default... Is this too much hand holding? + if input.Class == "" { + input.Class = "File" + } + } + + for _, output := range task.GetOutputs() { + if output.Class == "" { + output.Class = "File" + } } ch := make(chan *ga4gh_task_exec.JobId, 1) diff --git a/src/tes/worker/docker.go b/src/tes/worker/docker.go index 362a175..3d711d5 100644 --- a/src/tes/worker/docker.go +++ b/src/tes/worker/docker.go @@ -20,14 +20,18 @@ func (self DockerCmd) Run(containerName string, args []string, log.Printf("Docker Binds: %s", binds) - docker_args := []string{"run", "--rm", "-i", "-w", workdir} + docker_args := []string{"run", "--rm", "-i"} + + if workdir != "" { + docker_args = append(docker_args, "-w", workdir) + } for _, i := range binds { docker_args = append(docker_args, "-v", i) } docker_args = append(docker_args, containerName) docker_args = append(docker_args, args...) - log.Printf("Runner docker %s", strings.Join(args, " ")) + log.Printf("Runner docker %s", strings.Join(docker_args, " ")) cmd := exec.Command("docker", docker_args...) diff --git a/src/tes/worker/file_client.go b/src/tes/worker/file_client.go new file mode 100644 index 0000000..c477127 --- /dev/null +++ b/src/tes/worker/file_client.go @@ -0,0 +1,62 @@ +package tes_taskengine_worker + +import ( + "fmt" + "log" + "strings" +) + +var FILE_PROTOCOL = "file://" + +type FileAccess struct { + Allowed []string +} + +func NewFileAccess(allowed []string) *FileAccess { + return &FileAccess{Allowed: allowed} +} + +func (self *FileAccess) Get(storage string, hostPath string, class string) error { + log.Printf("Starting download of %s", storage) + storage = strings.TrimPrefix(storage, FILE_PROTOCOL) + found := false + for _, i := range self.Allowed { + if strings.HasPrefix(storage, i) { + found = true + } + } + if !found { + return fmt.Errorf("Can't access file %s", storage) + } + if class == "File" { + CopyFile(storage, hostPath) + return nil + } else if class == "Directory" { + CopyDir(storage, hostPath) + return nil + } + return fmt.Errorf("Unknown element type: %s", class) + +} + +func (self *FileAccess) Put(storage string, hostPath string, class string) error { + log.Printf("Starting upload of %s", storage) + storage = strings.TrimPrefix(storage, FILE_PROTOCOL) + found := false + for _, i := range self.Allowed { + if strings.HasPrefix(storage, i) { + found = true + } + } + if !found { + return fmt.Errorf("Can't access file %s", storage) + } + if class == "File" { + CopyFile(hostPath, storage) + return nil + } else if class == "Directory" { + CopyDir(hostPath, storage) + return nil + } + return fmt.Errorf("Unknown element type: %s", class) +} diff --git a/src/tes/worker/file_mapper.go b/src/tes/worker/file_mapper.go index 061d051..af8a547 100644 --- a/src/tes/worker/file_mapper.go +++ b/src/tes/worker/file_mapper.go @@ -25,7 +25,7 @@ type JobFileMapper struct { } type FileSystemAccess interface { - Get(storage string, path string) error + Get(storage string, path string, class string) error Put(storage string, path string, class string) error } @@ -84,7 +84,7 @@ func (self *FileMapper) MapInput(jobId string, storage string, mountPath string, if len(base) > 0 { dstPath := path.Join(vol.HostPath, relpath) fmt.Printf("get %s %s\n", storage, dstPath) - err := self.fileSystem.Get(storage, dstPath) + err := self.fileSystem.Get(storage, dstPath, class) if err != nil { return err } diff --git a/src/tes/worker/fs_client.go b/src/tes/worker/fs_client.go index d55e708..89d9a47 100644 --- a/src/tes/worker/fs_client.go +++ b/src/tes/worker/fs_client.go @@ -16,13 +16,19 @@ func NewSharedFS(base string) *FileStorageAccess { return &FileStorageAccess{StorageDir: base} } -func (self *FileStorageAccess) Get(storage string, hostPath string) error { +func (self *FileStorageAccess) Get(storage string, hostPath string, class string) error { storage = strings.TrimPrefix(storage, "fs://") srcPath := path.Join(self.StorageDir, storage) if _, err := os.Stat(srcPath); os.IsNotExist(err) { return fmt.Errorf("storage file '%s' not found", srcPath) } - copyFileContents(srcPath, hostPath) + if class == "File" { + copyFileContents(srcPath, hostPath) + } else if class == "Directory" { + CopyDir(srcPath, hostPath) + } else { + return fmt.Errorf("Unknown element type: %s", class) + } return nil } diff --git a/src/tes/worker/swift_client.go b/src/tes/worker/swift_client.go index bdc1060..df12d42 100644 --- a/src/tes/worker/swift_client.go +++ b/src/tes/worker/swift_client.go @@ -37,35 +37,40 @@ func NewSwiftAccess() *SwiftAccess { } -func (self *SwiftAccess) Get(storage string, hostPath string) error { +func (self *SwiftAccess) Get(storage string, hostPath string, class string) error { log.Printf("Starting download of %s", storage) storage = strings.TrimPrefix(storage, SWIFT_PROTOCOL) storage_split := strings.SplitN(storage, "/", 2) - // Download everything into a DownloadResult struct - opts := objects.DownloadOpts{} - res := objects.Download(self.client, storage_split[0], storage_split[1], opts) + if class == "File" { + // Download everything into a DownloadResult struct + opts := objects.DownloadOpts{} + res := objects.Download(self.client, storage_split[0], storage_split[1], opts) - file, err := os.Create(hostPath) - if err != nil { - return err - } - buffer := make([]byte, 10240) - total_len := 0 - for { - len, err := res.Body.Read(buffer) - if err == io.EOF { - break - } else if err != nil { - return fmt.Errorf("Error reading file") + file, err := os.Create(hostPath) + if err != nil { + return err } - total_len += len - file.Write(buffer[:len]) + buffer := make([]byte, 10240) + total_len := 0 + for { + len, err := res.Body.Read(buffer) + if err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("Error reading file") + } + total_len += len + file.Write(buffer[:len]) + } + file.Close() + res.Body.Close() + log.Printf("Downloaded %d bytes", total_len) + return nil + } else if class == "Directory" { + return fmt.Errorf("SWIFT directories not yet supported") } - file.Close() - res.Body.Close() - log.Printf("Downloaded %d bytes", total_len) - return nil + return fmt.Errorf("Unknown element type: %s", class) } func (self *SwiftAccess) Put(storage string, hostPath string, class string) error {