Skip to content

Commit

Permalink
running go fmt on some of the code projects
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Aug 23, 2016
1 parent 2799557 commit 8f32110
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 230 deletions.
12 changes: 5 additions & 7 deletions src/ga4gh-engine/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

package ga4gh_taskengine

import (
"ga4gh-tasks"
"ga4gh-engine/scaling"
"golang.org/x/net/context"
"ga4gh-server/proto"
"ga4gh-tasks"
"golang.org/x/net/context"
//"log"
)

Expand All @@ -14,25 +14,23 @@ type TaskDB interface {
}

func Scheduler(task_server TaskDB, scaler ga4gh_engine_scaling.Scaler) *TaskScheduler {
return &TaskScheduler{task_server:task_server, scaler:scaler}
return &TaskScheduler{task_server: task_server, scaler: scaler}
}

type TaskScheduler struct {
task_server TaskDB
scaler ga4gh_engine_scaling.Scaler
scaler ga4gh_engine_scaling.Scaler
}

func (self *TaskScheduler) WorkerPing(ctx context.Context, info *ga4gh_task_ref.WorkerInfo) (*ga4gh_task_ref.WorkerInfo, error) {
self.scaler.PingReceived(info)
return info, nil
}


func (self *TaskScheduler) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.JobRequest) (*ga4gh_task_ref.JobResponse, error) {
return self.task_server.GetJobToRun(ctx, request)
}


func (self *TaskScheduler) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.UpdateStatusRequest) (*ga4gh_task_exec.JobId, error) {
return self.task_server.UpdateJobStatus(ctx, stat)
}
Expand Down
36 changes: 19 additions & 17 deletions src/ga4gh-engine/worker/container.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@

package ga4gh_taskengine_worker

import (
"os"
"github.com/fsouza/go-dockerclient"
"log"
"os"
"strings"
"github.com/fsouza/go-dockerclient"
)


type ContainerManager interface {
Run(container string, args []string, binds[] string, remove bool, stdout_path *string, stderr_path *string) error
Run(container string, args []string, binds []string, workdir string, remove bool, stdout_path *string, stderr_path *string) error
}

type DockerDirect struct {
Expand All @@ -23,16 +21,20 @@ func NewDockerDirect() *DockerDirect {
log.Printf("Docker Error\n")
return nil
}
return &DockerDirect{ client:client }
return &DockerDirect{client: client}
}

func (self *DockerDirect) Run(containerName string, args []string, binds[] string, remove bool, stdout *os.File, stderr *os.File) (int, error) {
func (self *DockerDirect) Run(containerName string, args []string, binds []string, workdir string, remove bool, stdout *os.File, stderr *os.File) (int, error) {

create_config := docker.Config{
Image:containerName,
Cmd:args,
AttachStderr:true,
AttachStdout:true,
Image: containerName,
Cmd: args,
AttachStderr: true,
AttachStdout: true,
}

if len(workdir) > 0 {
create_config.WorkingDir = workdir
}

if _, ok := self.client.InspectImage(containerName); ok != nil {
Expand All @@ -43,8 +45,8 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin
if len(tmp) > 1 {
tag = tmp[1]
}
pull_opt := docker.PullImageOptions{ Repository: rep, Tag: tag }
if ok := self.client.PullImage( pull_opt, docker.AuthConfiguration{} ); ok != nil {
pull_opt := docker.PullImageOptions{Repository: rep, Tag: tag}
if ok := self.client.PullImage(pull_opt, docker.AuthConfiguration{}); ok != nil {
log.Printf("Image not pulled: %s", ok)
return -1, ok
}
Expand All @@ -60,7 +62,7 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin
}

log.Printf("Starting Docker (mount: %s): %s", strings.Join(binds, ","), strings.Join(args, " "))
err = self.client.StartContainer(container.ID, &docker.HostConfig {
err = self.client.StartContainer(container.ID, &docker.HostConfig{
Binds: binds,
})

Expand All @@ -72,7 +74,7 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin
log.Printf("Attaching Container: %s", container.ID)
exit_code, err := self.client.WaitContainer(container.ID)

logOpts := docker.LogsOptions{Container:container.ID, Stdout:false, Stderr:false}
logOpts := docker.LogsOptions{Container: container.ID, Stdout: false, Stderr: false}

if stdout != nil {
logOpts.Stdout = true
Expand All @@ -89,6 +91,6 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin
} else {
log.Printf("docker %s complete", container.ID, err)
}
self.client.RemoveContainer(docker.RemoveContainerOptions{ID:container.ID,RemoveVolumes:true})
self.client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, RemoveVolumes: true})
return exit_code, nil
}
}
13 changes: 6 additions & 7 deletions src/ga4gh-engine/worker/engine.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package ga4gh_taskengine_worker

import (
"os"
"fmt"
"ga4gh-tasks"
"os"
)


const HEADER_SIZE = int64(102400)

func read_file_head(path string) []byte {
f, _ := os.Open(path)
buffer := make([]byte, HEADER_SIZE)
Expand All @@ -16,7 +16,6 @@ func read_file_head(path string) []byte {
return buffer[:l]
}


func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error {

mapper.Job(job.JobId)
Expand All @@ -32,8 +31,8 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error {
}
}

for _, output := range(job.Task.Outputs) {
err := mapper.MapOutput(job.JobId, output.Location, output.Path, output.Directory)
for _, output := range job.Task.Outputs {
err := mapper.MapOutput(job.JobId, output.Location, output.Path, output.Directory, output.Create)
if err != nil {
return err
}
Expand All @@ -58,7 +57,7 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error {
binds := mapper.GetBindings(job.JobId)

dclient := NewDockerDirect()
exit_code, err := dclient.Run(dockerTask.ImageName, dockerTask.Cmd, binds, true, stdout, stderr)
exit_code, err := dclient.Run(dockerTask.ImageName, dockerTask.Cmd, binds, dockerTask.Workdir, true, stdout, stderr)
stdout.Close()
stderr.Close()

Expand Down Expand Up @@ -89,4 +88,4 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error {
mapper.FinalizeJob(job.JobId)

return nil
}
}
94 changes: 50 additions & 44 deletions src/ga4gh-engine/worker/file_client.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@


package ga4gh_taskengine_worker

import (
"os"
"strings"
"ga4gh-tasks"
"fmt"
"log"
"io/ioutil"
"path"
"ga4gh-server/proto"
"ga4gh-tasks"
"golang.org/x/net/context"
"io/ioutil"
"log"
"os"
"path"
"strings"
)


type FileMapper interface {
Job(jobId string)
AddVolume(jobId string, source string, mount string)
MapInput(jobId string, storagePath string, localPath string, directory bool) error
MapOutput(jobId string, storagePath string, localPath string, directory bool) error
MapOutput(jobId string, storagePath string, localPath string, directory bool, create bool) error

HostPath(jobId string, mountPath string) string

Expand All @@ -35,14 +32,12 @@ type EngineStatus struct {
ActiveJobs int32
}


type FSBinding struct {
HostPath string
HostPath string
ContainerPath string
Mode string
Mode string
}


func NewSharedFS(client *ga4gh_task_ref.SchedulerClient, storageDir string, volumeDir string) *SharedFileMapper {
if _, err := os.Stat(storageDir); os.IsNotExist(err) {
os.Mkdir(storageDir, 0700)
Expand All @@ -51,21 +46,21 @@ func NewSharedFS(client *ga4gh_task_ref.SchedulerClient, storageDir string, volu
os.Mkdir(volumeDir, 0700)
}

return &SharedFileMapper{StorageDir: storageDir, VolumeDir: volumeDir, jobs: make(map[string]*JobSharedFileMapper), client:client}
return &SharedFileMapper{StorageDir: storageDir, VolumeDir: volumeDir, jobs: make(map[string]*JobSharedFileMapper), client: client}
}

type JobSharedFileMapper struct {
JobId string
WorkDir string
JobId string
WorkDir string
Bindings []FSBinding
Outputs []ga4gh_task_exec.TaskParameter
Outputs []ga4gh_task_exec.TaskParameter
}

type SharedFileMapper struct {
StorageDir string
VolumeDir string
client *ga4gh_task_ref.SchedulerClient
jobs map[string]*JobSharedFileMapper
VolumeDir string
client *ga4gh_task_ref.SchedulerClient
jobs map[string]*JobSharedFileMapper
}

func (self *SharedFileMapper) Job(jobId string) {
Expand All @@ -74,30 +69,28 @@ func (self *SharedFileMapper) Job(jobId string) {
if _, err := os.Stat(w); err != nil {
os.Mkdir(w, 0700)
}
a := JobSharedFileMapper{JobId:jobId, WorkDir:w}
a := JobSharedFileMapper{JobId: jobId, WorkDir: w}
self.jobs[jobId] = &a
}

func (self *SharedFileMapper) AddVolume(jobId string, source string, mount string) {
tmpPath, _ := ioutil.TempDir(self.VolumeDir, fmt.Sprintf("job_%s", jobId))
b := FSBinding {
HostPath: tmpPath,
b := FSBinding{
HostPath: tmpPath,
ContainerPath: mount,
Mode: "rw",
Mode: "rw",
}
j := self.jobs[jobId]
j.Bindings = append(j.Bindings, b)
}



func pathMatch(base string, query string) (string, string) {
if path.Clean(base) == path.Clean(query) {
return query, ""
}
dir, file := path.Split(query)
if len(dir) > 1 {
d, p := pathMatch(base, dir)
d, p := pathMatch(base, dir)
return d, path.Join(p, file)
}
return "", ""
Expand All @@ -123,21 +116,32 @@ func (self *SharedFileMapper) MapInput(jobId string, storage string, mountPath s
for _, vol := range self.jobs[jobId].Bindings {
base, relpath := pathMatch(vol.ContainerPath, mountPath)
if len(base) > 0 {
fmt.Printf("cp %s %s\n", srcPath, path.Join(vol.HostPath, relpath) )
copyFileContents(srcPath, path.Join(vol.HostPath, relpath) )
fmt.Printf("cp %s %s\n", srcPath, path.Join(vol.HostPath, relpath))
copyFileContents(srcPath, path.Join(vol.HostPath, relpath))
}
}
return nil
}

func (self *SharedFileMapper) MapOutput(jobId string, storage string, mountPath string, directory bool) error {
a := ga4gh_task_exec.TaskParameter{Location:storage, Path:mountPath}
func (self *SharedFileMapper) MapOutput(jobId string, storage string, mountPath string, directory bool, create bool) error {
a := ga4gh_task_exec.TaskParameter{Location: storage, Path: mountPath, Create: create, Directory: directory}
j := self.jobs[jobId]
if create {
for _, vol := range self.jobs[jobId].Bindings {
base, relpath := pathMatch(vol.ContainerPath, mountPath)
if len(base) > 0 {
if directory {
os.MkdirAll(path.Join(vol.HostPath, relpath), 0777)
} else {
ioutil.WriteFile(path.Join(vol.HostPath, relpath), []byte{}, 0777)
}
}
}
}
j.Outputs = append(j.Outputs, a)
return nil
}


func (self *SharedFileMapper) GetBindings(jobId string) []string {
out := make([]string, 0, 10)
for _, c := range self.jobs[jobId].Bindings {
Expand All @@ -147,31 +151,33 @@ func (self *SharedFileMapper) GetBindings(jobId string) []string {
return out
}


func (self *SharedFileMapper) UpdateOutputs(jobId string, jobNum int, exitCode int, stdoutText string, stderrText string) {
log := ga4gh_task_exec.JobLog{Stdout:stdoutText, Stderr:stderrText, ExitCode:int32(exitCode)}
a := ga4gh_task_ref.UpdateStatusRequest{Id:jobId, Step:int64(jobNum), Log:&log }
log := ga4gh_task_exec.JobLog{Stdout: stdoutText, Stderr: stderrText, ExitCode: int32(exitCode)}
a := ga4gh_task_ref.UpdateStatusRequest{Id: jobId, Step: int64(jobNum), Log: &log}
(*self.client).UpdateJobStatus(context.Background(), &a)
}


func (self *SharedFileMapper) TempFile(jobId string) (f *os.File, err error) {
out, err := ioutil.TempFile(self.jobs[jobId].WorkDir, "ga4ghtask_")
return out, err
}



func (self *SharedFileMapper) FinalizeJob(jobId string) {
for _, out := range self.jobs[jobId].Outputs {
hst := self.HostPath(jobId, out.Path)
storage := strings.TrimPrefix(out.Location, "fs://")
fmt.Printf("copy out %s %s\n", hst, path.Join(self.StorageDir, storage))
log.Printf("copy out %s %s (%#v)\n", hst, path.Join(self.StorageDir, storage), out)
//copy to storage directory
err := CopyFile(hst, path.Join(self.StorageDir, storage))
if err != nil {
log.Printf("Error copying output %s to %s", hst, out.Location)
if out.Directory {
err := CopyDir(hst, path.Join(self.StorageDir, storage))
if err != nil {
log.Printf("Error copying output directory %s to %s", hst, out.Location)
}
} else {
err := CopyFile(hst, path.Join(self.StorageDir, storage))
if err != nil {
log.Printf("Error copying output file %s to %s", hst, out.Location)
}
}
}
}

Loading

0 comments on commit 8f32110

Please sign in to comment.