Skip to content

Commit

Permalink
Refactoring task file mapper to support multiple backends. Started wo…
Browse files Browse the repository at this point in the history
…rking on the swift file client. Added new docker exection client based on official docker client (original client stopped workin on latest docker update)
  • Loading branch information
kellrott committed Aug 25, 2016
1 parent 8f32110 commit 01f90a7
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 92 deletions.
11 changes: 4 additions & 7 deletions src/ga4gh-engine/scaling/mapping.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@

package ga4gh_engine_scaling

import (
"ga4gh-tasks"
"ga4gh-server/proto"
"ga4gh-tasks"
)


type Scaler interface {
JobAdded(*ga4gh_task_exec.Resources)
PingReceived(*ga4gh_task_ref.WorkerInfo)
}

type ScalerInit func(map[string]string) Scaler


var ScalingMethods = map[string]ScalerInit{
"local" : NewLocalScaler,
"local": NewLocalScaler,
}

type LocalScaler struct {

}

func NewLocalScaler(config map[string]string) Scaler {
Expand All @@ -34,4 +31,4 @@ func (self LocalScaler) JobAdded(request *ga4gh_task_exec.Resources) {

func (self LocalScaler) PingReceived(worker *ga4gh_task_ref.WorkerInfo) {
//do something here
}
}
3 changes: 3 additions & 0 deletions src/ga4gh-engine/worker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (self *DockerDirect) Run(containerName string, args []string, binds []strin

log.Printf("Attaching Container: %s", container.ID)
exit_code, err := self.client.WaitContainer(container.ID)
if err != nil {
log.Printf("Docker run Error: %s", err)
}

logOpts := docker.LogsOptions{Container: container.ID, Stdout: false, Stderr: false}

Expand Down
108 changes: 108 additions & 0 deletions src/ga4gh-engine/worker/container_engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package ga4gh_taskengine_worker

import (
"github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/container"
"github.com/docker/engine-api/types/network"
"golang.org/x/net/context"
"io"
"log"
"os"
"strings"
)

type DockerEngine struct {
client *client.Client
}

func NewDockerEngine() *DockerEngine {
client, err := client.NewEnvClient()
if err != nil {
log.Printf("Docker Error\n")
return nil
}
return &DockerEngine{client: client}
}

func (self *DockerEngine) Run(containerName string, args []string,
binds []string, workdir string, remove bool, stdout *os.File, stderr *os.File) (int, error) {

list, err := self.client.ImageList(context.Background(), types.ImageListOptions{MatchName: containerName})

if err != nil || len(list) == 0 {
log.Printf("Image %s not found", containerName)
pull_opt := types.ImagePullOptions{}
r, err := self.client.ImagePull(context.Background(), containerName, pull_opt)
if err != nil {
log.Printf("Image not pulled: %s", err)
return -1, err
}
for {
l := make([]byte, 1000)
_, e := r.Read(l)
if e == io.EOF {
break
}
log.Printf("%s", l)
}
r.Close()
log.Printf("Image Pulled")
}

container, err := self.client.ContainerCreate(context.Background(),
&container.Config{Cmd: args, Image: containerName, Tty: true},
&container.HostConfig{Binds: binds},
&network.NetworkingConfig{},
"",
)

if err != nil {
log.Printf("Docker run Error: %s", err)
return 0, err
}

log.Printf("Starting Docker (mount: %s): %s", strings.Join(binds, ","), strings.Join(args, " "))
err = self.client.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{})

if err != nil {
log.Printf("Docker run Error: %s", err)
return 0, err
}

log.Printf("Attaching Container: %s", container.ID)
exit_code, err := self.client.ContainerWait(context.Background(), container.ID)
if err != nil {
log.Printf("docker %s error: %s", container.ID, err)
} else {
log.Printf("docker %s complete", container.ID, err)
}

if stdout != nil {
stdout_log, _ := self.client.ContainerLogs(context.Background(), container.ID, types.ContainerLogsOptions{ShowStdout: true, Details: false})
buffer := make([]byte, 10240)
for {
l, e := stdout_log.Read(buffer)
if e == io.EOF {
break
}
stdout.Write(buffer[:l])
}
stdout_log.Close()
}

if stderr != nil {
stderr_log, _ := self.client.ContainerLogs(context.Background(), container.ID, types.ContainerLogsOptions{ShowStderr: true})
buffer := make([]byte, 10240)
for {
l, e := stderr_log.Read(buffer)
if e == io.EOF {
break
}
stderr.Write(buffer[:l])
}
stderr_log.Close()
}
self.client.ContainerRemove(context.Background(), container.ID, types.ContainerRemoveOptions{RemoveVolumes: true})
return exit_code, nil
}
2 changes: 1 addition & 1 deletion src/ga4gh-engine/worker/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error {
}
binds := mapper.GetBindings(job.JobId)

dclient := NewDockerDirect()
dclient := NewDockerEngine()
exit_code, err := dclient.Run(dockerTask.ImageName, dockerTask.Cmd, binds, dockerTask.Workdir, true, stdout, stderr)
stdout.Close()
stderr.Close()
Expand Down
108 changes: 31 additions & 77 deletions src/ga4gh-engine/worker/file_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,27 @@ import (
"ga4gh-tasks"
"golang.org/x/net/context"
"io/ioutil"
"log"
"os"
"path"
"strings"
)

type FileMapper interface {
Job(jobId string)
AddVolume(jobId string, source string, mount string)
MapInput(jobId string, storagePath string, localPath string, directory bool) error
MapOutput(jobId string, storagePath string, localPath string, directory bool, create bool) error

HostPath(jobId string, mountPath string) string
type FileMapper struct {
fileSystem FileSystemAccess
VolumeDir string
client *ga4gh_task_ref.SchedulerClient
jobs map[string]*JobFileMapper
}

TempFile(jobId string) (f *os.File, err error)
GetBindings(jobId string) []string
UpdateOutputs(jobId string, stepNum int, exit_code int, stdoutText string, stderrText string)
type JobFileMapper struct {
JobId string
WorkDir string
Bindings []FSBinding
Outputs []ga4gh_task_exec.TaskParameter
}

FinalizeJob(jobId string)
type FileSystemAccess interface {
Get(storage string, path string) error
Put(storage string, path string, directory bool) error
}

type EngineStatus struct {
Expand All @@ -38,42 +40,24 @@ type FSBinding struct {
Mode string
}

func NewSharedFS(client *ga4gh_task_ref.SchedulerClient, storageDir string, volumeDir string) *SharedFileMapper {
if _, err := os.Stat(storageDir); os.IsNotExist(err) {
os.Mkdir(storageDir, 0700)
}
func NewFileMapper(client *ga4gh_task_ref.SchedulerClient, fileSystem FileSystemAccess, volumeDir string) *FileMapper {
if _, err := os.Stat(volumeDir); os.IsNotExist(err) {
os.Mkdir(volumeDir, 0700)
}

return &SharedFileMapper{StorageDir: storageDir, VolumeDir: volumeDir, jobs: make(map[string]*JobSharedFileMapper), client: client}
}

type JobSharedFileMapper struct {
JobId string
WorkDir string
Bindings []FSBinding
Outputs []ga4gh_task_exec.TaskParameter
}

type SharedFileMapper struct {
StorageDir string
VolumeDir string
client *ga4gh_task_ref.SchedulerClient
jobs map[string]*JobSharedFileMapper
return &FileMapper{VolumeDir: volumeDir, jobs: make(map[string]*JobFileMapper), client: client, fileSystem: fileSystem}
}

func (self *SharedFileMapper) Job(jobId string) {
func (self *FileMapper) Job(jobId string) {
//create a working 'disk' for runtime files
w := path.Join(self.VolumeDir, jobId)
if _, err := os.Stat(w); err != nil {
os.Mkdir(w, 0700)
}
a := JobSharedFileMapper{JobId: jobId, WorkDir: w}
a := JobFileMapper{JobId: jobId, WorkDir: w}
self.jobs[jobId] = &a
}

func (self *SharedFileMapper) AddVolume(jobId string, source string, mount string) {
func (self *FileMapper) AddVolume(jobId string, source string, mount string) {
tmpPath, _ := ioutil.TempDir(self.VolumeDir, fmt.Sprintf("job_%s", jobId))
b := FSBinding{
HostPath: tmpPath,
Expand All @@ -84,19 +68,7 @@ func (self *SharedFileMapper) AddVolume(jobId string, source string, mount strin
j.Bindings = append(j.Bindings, b)
}

func pathMatch(base string, query string) (string, string) {
if path.Clean(base) == path.Clean(query) {
return query, ""
}
dir, file := path.Split(query)
if len(dir) > 1 {
d, p := pathMatch(base, dir)
return d, path.Join(p, file)
}
return "", ""
}

func (self *SharedFileMapper) HostPath(jobId string, mountPath string) string {
func (self *FileMapper) HostPath(jobId string, mountPath string) string {
for _, vol := range self.jobs[jobId].Bindings {
base, relpath := pathMatch(vol.ContainerPath, mountPath)
if len(base) > 0 {
Expand All @@ -106,24 +78,19 @@ func (self *SharedFileMapper) HostPath(jobId string, mountPath string) string {
return ""
}

func (self *SharedFileMapper) MapInput(jobId string, storage string, mountPath string, directory bool) error {
storage = strings.TrimPrefix(storage, "fs://")
srcPath := path.Join(self.StorageDir, storage)
if _, err := os.Stat(srcPath); os.IsNotExist(err) {
return fmt.Errorf("storage file '%s' not found", srcPath)
}

func (self *FileMapper) MapInput(jobId string, storage string, mountPath string, directory bool) error {
for _, vol := range self.jobs[jobId].Bindings {
base, relpath := pathMatch(vol.ContainerPath, mountPath)
if len(base) > 0 {
fmt.Printf("cp %s %s\n", srcPath, path.Join(vol.HostPath, relpath))
copyFileContents(srcPath, path.Join(vol.HostPath, relpath))
dstPath := path.Join(vol.HostPath, relpath)
fmt.Printf("get %s %s\n", storage, dstPath)
self.fileSystem.Get(storage, dstPath)
}
}
return nil
}

func (self *SharedFileMapper) MapOutput(jobId string, storage string, mountPath string, directory bool, create bool) error {
func (self *FileMapper) MapOutput(jobId string, storage string, mountPath string, directory bool, create bool) error {
a := ga4gh_task_exec.TaskParameter{Location: storage, Path: mountPath, Create: create, Directory: directory}
j := self.jobs[jobId]
if create {
Expand All @@ -142,7 +109,7 @@ func (self *SharedFileMapper) MapOutput(jobId string, storage string, mountPath
return nil
}

func (self *SharedFileMapper) GetBindings(jobId string) []string {
func (self *FileMapper) GetBindings(jobId string) []string {
out := make([]string, 0, 10)
for _, c := range self.jobs[jobId].Bindings {
o := fmt.Sprintf("%s:%s:%s", c.HostPath, c.ContainerPath, c.Mode)
Expand All @@ -151,33 +118,20 @@ func (self *SharedFileMapper) GetBindings(jobId string) []string {
return out
}

func (self *SharedFileMapper) UpdateOutputs(jobId string, jobNum int, exitCode int, stdoutText string, stderrText string) {
func (self *FileMapper) UpdateOutputs(jobId string, jobNum int, exitCode int, stdoutText string, stderrText string) {
log := ga4gh_task_exec.JobLog{Stdout: stdoutText, Stderr: stderrText, ExitCode: int32(exitCode)}
a := ga4gh_task_ref.UpdateStatusRequest{Id: jobId, Step: int64(jobNum), Log: &log}
(*self.client).UpdateJobStatus(context.Background(), &a)
}

func (self *SharedFileMapper) TempFile(jobId string) (f *os.File, err error) {
func (self *FileMapper) TempFile(jobId string) (f *os.File, err error) {
out, err := ioutil.TempFile(self.jobs[jobId].WorkDir, "ga4ghtask_")
return out, err
}

func (self *SharedFileMapper) FinalizeJob(jobId string) {
func (self *FileMapper) FinalizeJob(jobId string) {
for _, out := range self.jobs[jobId].Outputs {
hst := self.HostPath(jobId, out.Path)
storage := strings.TrimPrefix(out.Location, "fs://")
log.Printf("copy out %s %s (%#v)\n", hst, path.Join(self.StorageDir, storage), out)
//copy to storage directory
if out.Directory {
err := CopyDir(hst, path.Join(self.StorageDir, storage))
if err != nil {
log.Printf("Error copying output directory %s to %s", hst, out.Location)
}
} else {
err := CopyFile(hst, path.Join(self.StorageDir, storage))
if err != nil {
log.Printf("Error copying output file %s to %s", hst, out.Location)
}
}
self.fileSystem.Put(out.Location, hst, out.Directory)
}
}
Loading

0 comments on commit 01f90a7

Please sign in to comment.