Skip to content

Commit

Permalink
Fixing bugs with new schema.
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Oct 5, 2016
1 parent e6d9bb7 commit 9ea6f1b
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/ga4gh-engine/worker/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error {
}

for _, input := range job.Task.Inputs {
err := mapper.MapInput(job.JobId, input.Location, input.Path, input.Directory)
err := mapper.MapInput(job.JobId, input.Location, input.Path, input.Class)
if err != nil {
return err
}
}

for _, output := range job.Task.Outputs {
err := mapper.MapOutput(job.JobId, output.Location, output.Path, output.Directory, output.Create)
err := mapper.MapOutput(job.JobId, output.Location, output.Path, output.Class, output.Create)
if err != nil {
return err
}
Expand Down
16 changes: 9 additions & 7 deletions src/ga4gh-engine/worker/file_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type JobFileMapper struct {

type FileSystemAccess interface {
Get(storage string, path string) error
Put(storage string, path string, directory bool) error
Put(storage string, path string, class string) error
}

type EngineStatus struct {
Expand Down Expand Up @@ -78,7 +78,7 @@ func (self *FileMapper) HostPath(jobId string, mountPath string) string {
return ""
}

func (self *FileMapper) MapInput(jobId string, storage string, mountPath string, directory bool) error {
func (self *FileMapper) MapInput(jobId string, storage string, mountPath string, class string) error {
for _, vol := range self.jobs[jobId].Bindings {
base, relpath := pathMatch(vol.ContainerPath, mountPath)
if len(base) > 0 {
Expand All @@ -93,17 +93,19 @@ func (self *FileMapper) MapInput(jobId string, storage string, mountPath string,
return nil
}

func (self *FileMapper) MapOutput(jobId string, storage string, mountPath string, directory bool, create bool) error {
a := ga4gh_task_exec.TaskParameter{Location: storage, Path: mountPath, Create: create, Directory: directory}
func (self *FileMapper) MapOutput(jobId string, storage string, mountPath string, class string, create bool) error {
a := ga4gh_task_exec.TaskParameter{Location: storage, Path: mountPath, Create: create, Class: class}
j := self.jobs[jobId]
if create {
for _, vol := range self.jobs[jobId].Bindings {
base, relpath := pathMatch(vol.ContainerPath, mountPath)
if len(base) > 0 {
if directory {
if class == "Directory" {
os.MkdirAll(path.Join(vol.HostPath, relpath), 0777)
} else {
} else if class == "File" {
ioutil.WriteFile(path.Join(vol.HostPath, relpath), []byte{}, 0777)
} else {
return fmt.Errorf("Unknown class type: %s", class)
}
}
}
Expand Down Expand Up @@ -135,6 +137,6 @@ func (self *FileMapper) TempFile(jobId string) (f *os.File, err error) {
func (self *FileMapper) FinalizeJob(jobId string) {
for _, out := range self.jobs[jobId].Outputs {
hst := self.HostPath(jobId, out.Path)
self.fileSystem.Put(out.Location, hst, out.Directory)
self.fileSystem.Put(out.Location, hst, out.Class)
}
}
8 changes: 5 additions & 3 deletions src/ga4gh-engine/worker/fs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,26 @@ func (self *FileStorageAccess) Get(storage string, hostPath string) error {
return nil
}

func (self *FileStorageAccess) Put(location string, hostPath string, directory bool) error {
func (self *FileStorageAccess) Put(location string, hostPath string, class string) error {

storage := strings.TrimPrefix(location, "fs://")

log.Printf("copy out %s %s\n", hostPath, path.Join(self.StorageDir, storage))
//copy to storage directory
if directory {
if class == "Directory" {
err := CopyDir(hostPath, path.Join(self.StorageDir, storage))
if err != nil {
log.Printf("Error copying output directory %s to %s", hostPath, location)
return err
}
} else {
} else if class == "File" {
err := CopyFile(hostPath, path.Join(self.StorageDir, storage))
if err != nil {
log.Printf("Error copying output file %s to %s", hostPath, location)
return err
}
} else {
return fmt.Errorf("Unknown Class type: %s", class)
}
return nil
}
21 changes: 13 additions & 8 deletions src/ga4gh-engine/worker/swift_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (self *SwiftAccess) Get(storage string, hostPath string) error {
return nil
}

func (self *SwiftAccess) Put(storage string, hostPath string, directory bool) error {
func (self *SwiftAccess) Put(storage string, hostPath string, class string) error {
log.Printf("Starting upload of %s", storage)
content, err := os.Open(hostPath)
if err != nil {
Expand All @@ -77,11 +77,16 @@ func (self *SwiftAccess) Put(storage string, hostPath string, directory bool) er

storage = strings.TrimPrefix(storage, SWIFT_PROTOCOL)
storage_split := strings.SplitN(storage, "/", 2)

// Now execute the upload
opts := objects.CreateOpts{}
res := objects.Create(self.client, storage_split[0], storage_split[1], content, opts)
_, err = res.ExtractHeader()
content.Close()
return err

if class == "File" {
// Now execute the upload
opts := objects.CreateOpts{}
res := objects.Create(self.client, storage_split[0], storage_split[1], content, opts)
_, err = res.ExtractHeader()
content.Close()
return err
} else if class == "Directory" {
return fmt.Errorf("SWIFT directories not yet supported")
}
return fmt.Errorf("Unknown element type: %s", class)
}
2 changes: 2 additions & 0 deletions tests/test_fileop.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ def test_file_mount(self):
"name" : "infile",
"description" : "File to be MD5ed",
"location" : in_loc,
"class" : "File",
"path" : "/tmp/test_file"
}
],
"outputs" : [
{
"location" : out_loc,
"class" : "File",
"path" : "/tmp/test_out"
}
],
Expand Down

0 comments on commit 9ea6f1b

Please sign in to comment.