Skip to content

Commit

Permalink
Adding in code code allowing generic shared file system as an input
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Oct 17, 2016
1 parent ffcb580 commit de2be63
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 35 deletions.
15 changes: 8 additions & 7 deletions bin/tes-runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json
import time
import requests
import urllib
import argparse

if __name__ == "__main__":
Expand All @@ -14,18 +14,19 @@
with open(args.task) as handle:
task = json.loads(handle.read())

r = requests.post("%s/v1/jobs" % (args.server), json=task)
data = r.json()
u = urllib.urlopen("%s/v1/jobs" % (args.server), json.dumps(task))
data = json.loads(u.read())

print data
job_id = data['value']

for i in range(10):
r = requests.get("%s/v1/jobs/%s" % (args.server, job_id))
data = r.json()
while True:
r = urllib.urlopen("%s/v1/jobs/%s" % (args.server, job_id))
data = json.loads(r.read())
if data["state"] not in ['Queued', "Running"]:
break
time.sleep(1)
print data
print json.dumps(data, indent=4)



9 changes: 9 additions & 0 deletions src/tes-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"google.golang.org/grpc"
"log"
"os"
"strings"
"path/filepath"
"tes/server/proto"
"tes/worker"
Expand All @@ -16,6 +17,7 @@ func main() {
agro_server := flag.String("master", "localhost:9090", "Master Server")
volume_dir_arg := flag.String("volumes", "volumes", "Volume Dir")
storage_dir_arg := flag.String("storage", "storage", "Storage Dir")
file_system_arg := flag.String("files", "", "Allowed File Paths")
swift_dir_arg := flag.String("swift", "", "Cache Swift items in directory")
timeout_arg := flag.Int("timeout", -1, "Timeout in seconds")

Expand Down Expand Up @@ -43,6 +45,13 @@ func main() {
}

file_client = tes_taskengine_worker.NewSwiftAccess()
} else if *file_system_arg != "" {
o := []string{}
for _, i := range strings.Split(*file_system_arg, ",") {
p, _ := filepath.Abs(i)
o = append(o, p)
}
file_client = tes_taskengine_worker.NewFileAccess(o)
} else {
storage_dir, _ := filepath.Abs(*storage_dir_arg)
if _, err := os.Stat(storage_dir); os.IsNotExist(err) {
Expand Down
10 changes: 10 additions & 0 deletions src/tes/server/task_boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) (
if !disk_found {
return nil, fmt.Errorf("Required volume '%s' not found in resources", input.Path)
}
//Fixing blank value to File by default... Is this too much hand holding?
if input.Class == "" {
input.Class = "File"
}
}

for _, output := range task.GetOutputs() {
if output.Class == "" {
output.Class = "File"
}
}

ch := make(chan *ga4gh_task_exec.JobId, 1)
Expand Down
8 changes: 6 additions & 2 deletions src/tes/worker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ func (self DockerCmd) Run(containerName string, args []string,

log.Printf("Docker Binds: %s", binds)

docker_args := []string{"run", "--rm", "-i", "-w", workdir}
docker_args := []string{"run", "--rm", "-i"}

if workdir != "" {
docker_args = append(docker_args, "-w", workdir)
}

for _, i := range binds {
docker_args = append(docker_args, "-v", i)
}
docker_args = append(docker_args, containerName)
docker_args = append(docker_args, args...)
log.Printf("Runner docker %s", strings.Join(args, " "))
log.Printf("Runner docker %s", strings.Join(docker_args, " "))

cmd := exec.Command("docker", docker_args...)

Expand Down
62 changes: 62 additions & 0 deletions src/tes/worker/file_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package tes_taskengine_worker

import (
"fmt"
"log"
"strings"
)

var FILE_PROTOCOL = "file://"

type FileAccess struct {
Allowed []string
}

func NewFileAccess(allowed []string) *FileAccess {
return &FileAccess{Allowed: allowed}
}

func (self *FileAccess) Get(storage string, hostPath string, class string) error {
log.Printf("Starting download of %s", storage)
storage = strings.TrimPrefix(storage, FILE_PROTOCOL)
found := false
for _, i := range self.Allowed {
if strings.HasPrefix(storage, i) {
found = true
}
}
if !found {
return fmt.Errorf("Can't access file %s", storage)
}
if class == "File" {
CopyFile(storage, hostPath)
return nil
} else if class == "Directory" {
CopyDir(storage, hostPath)
return nil
}
return fmt.Errorf("Unknown element type: %s", class)

}

func (self *FileAccess) Put(storage string, hostPath string, class string) error {
log.Printf("Starting upload of %s", storage)
storage = strings.TrimPrefix(storage, FILE_PROTOCOL)
found := false
for _, i := range self.Allowed {
if strings.HasPrefix(storage, i) {
found = true
}
}
if !found {
return fmt.Errorf("Can't access file %s", storage)
}
if class == "File" {
CopyFile(hostPath, storage)
return nil
} else if class == "Directory" {
CopyDir(hostPath, storage)
return nil
}
return fmt.Errorf("Unknown element type: %s", class)
}
4 changes: 2 additions & 2 deletions src/tes/worker/file_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type JobFileMapper struct {
}

type FileSystemAccess interface {
Get(storage string, path string) error
Get(storage string, path string, class string) error
Put(storage string, path string, class string) error
}

Expand Down Expand Up @@ -84,7 +84,7 @@ func (self *FileMapper) MapInput(jobId string, storage string, mountPath string,
if len(base) > 0 {
dstPath := path.Join(vol.HostPath, relpath)
fmt.Printf("get %s %s\n", storage, dstPath)
err := self.fileSystem.Get(storage, dstPath)
err := self.fileSystem.Get(storage, dstPath, class)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions src/tes/worker/fs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ func NewSharedFS(base string) *FileStorageAccess {
return &FileStorageAccess{StorageDir: base}
}

func (self *FileStorageAccess) Get(storage string, hostPath string) error {
func (self *FileStorageAccess) Get(storage string, hostPath string, class string) error {
storage = strings.TrimPrefix(storage, "fs://")
srcPath := path.Join(self.StorageDir, storage)
if _, err := os.Stat(srcPath); os.IsNotExist(err) {
return fmt.Errorf("storage file '%s' not found", srcPath)
}
copyFileContents(srcPath, hostPath)
if class == "File" {
copyFileContents(srcPath, hostPath)
} else if class == "Directory" {
CopyDir(srcPath, hostPath)
} else {
return fmt.Errorf("Unknown element type: %s", class)
}
return nil
}

Expand Down
49 changes: 27 additions & 22 deletions src/tes/worker/swift_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,40 @@ func NewSwiftAccess() *SwiftAccess {

}

func (self *SwiftAccess) Get(storage string, hostPath string) error {
func (self *SwiftAccess) Get(storage string, hostPath string, class string) error {
log.Printf("Starting download of %s", storage)
storage = strings.TrimPrefix(storage, SWIFT_PROTOCOL)
storage_split := strings.SplitN(storage, "/", 2)

// Download everything into a DownloadResult struct
opts := objects.DownloadOpts{}
res := objects.Download(self.client, storage_split[0], storage_split[1], opts)
if class == "File" {
// Download everything into a DownloadResult struct
opts := objects.DownloadOpts{}
res := objects.Download(self.client, storage_split[0], storage_split[1], opts)

file, err := os.Create(hostPath)
if err != nil {
return err
}
buffer := make([]byte, 10240)
total_len := 0
for {
len, err := res.Body.Read(buffer)
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("Error reading file")
file, err := os.Create(hostPath)
if err != nil {
return err
}
total_len += len
file.Write(buffer[:len])
buffer := make([]byte, 10240)
total_len := 0
for {
len, err := res.Body.Read(buffer)
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("Error reading file")
}
total_len += len
file.Write(buffer[:len])
}
file.Close()
res.Body.Close()
log.Printf("Downloaded %d bytes", total_len)
return nil
} else if class == "Directory" {
return fmt.Errorf("SWIFT directories not yet supported")
}
file.Close()
res.Body.Close()
log.Printf("Downloaded %d bytes", total_len)
return nil
return fmt.Errorf("Unknown element type: %s", class)
}

func (self *SwiftAccess) Put(storage string, hostPath string, class string) error {
Expand Down

0 comments on commit de2be63

Please sign in to comment.