diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..aeb52fc --- /dev/null +++ b/.travis.yml @@ -0,0 +1,17 @@ +language: python +sudo: required +services: + - docker +python: + - "2.7" +os: + - linux +before_install: + - sudo apt-get update + - sudo apt-get install -y golang +install: + - make depends + - make +script: nosetests tests +notifications: + email: false \ No newline at end of file diff --git a/Makefile b/Makefile index cfb8ced..a830f7d 100644 --- a/Makefile +++ b/Makefile @@ -4,21 +4,21 @@ export GOPATH PATH := ${PATH}:$(shell pwd)/bin export PATH -PROTO_INC= -I ./ -I $(GOPATH)/src/github.com/gengo/grpc-gateway/third_party/googleapis/ +PROTO_INC= -I ./ -I $(GOPATH)/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis/ server: - go install ga4gh-taskserver - go install ga4gh-worker + go install tes-server + go install tes-worker proto_build: cd task-execution-schemas/proto && protoc $(PROTO_INC) \ - --go_out=Mgoogle/api/annotations.proto=github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis/google/api,plugins=grpc:../../src/ga4gh-tasks/ \ - --grpc-gateway_out=logtostderr=true:../../src/ga4gh-tasks/ \ + --go_out=Mgoogle/api/annotations.proto=github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis/google/api,plugins=grpc:../../src/tes/ga4gh/ \ + --grpc-gateway_out=logtostderr=true:../../src/tes/ga4gh/ \ task_execution.proto cd proto && protoc \ $(PROTO_INC) \ -I ../task-execution-schemas/proto/ \ - --go_out=Mtask_execution.proto=ga4gh-tasks,plugins=grpc:../src/ga4gh-server/proto \ + --go_out=Mtask_execution.proto=tes/ga4gh,plugins=grpc:../src/tes/server/proto \ task_worker.proto grpc: @@ -27,6 +27,6 @@ grpc: go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway depends: grpc - go get -d ga4gh-taskserver/ - go get -d ga4gh-worker/ + go get -d tes-server/ + go get -d tes-worker/ \ No newline at end of file diff --git a/README.md b/README.md index a90da92..ba157f8 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,12 @@ make ## Start task server ``` -./bin/ga4gh-taskserver +./bin/tes-server ``` ## Start worker ``` -./bin/ga4gh-worker +./bin/tes-worker ``` ## Get info about task execution service diff --git a/bin/tes-runner.py b/bin/tes-runner.py new file mode 100755 index 0000000..25b9347 --- /dev/null +++ b/bin/tes-runner.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python + +import json +import time +import requests +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-s", "--server", default="http://localhost:8000") + parser.add_argument("task") + args = parser.parse_args() + + with open(args.task) as handle: + task = json.loads(handle.read()) + + r = requests.post("%s/v1/jobs" % (args.server), json=task) + data = r.json() + print data + job_id = data['value'] + + for i in range(10): + r = requests.get("%s/v1/jobs/%s" % (args.server, job_id)) + data = r.json() + if data["state"] not in ['Queued', "Running"]: + break + time.sleep(1) + print data + + + diff --git a/deploy-ubuntu.sh b/deploy-ubuntu.sh new file mode 100644 index 0000000..751d786 --- /dev/null +++ b/deploy-ubuntu.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +sudo apt-get update +sudo apt install -y golang-go + +git clone https://github.com/bmeg/task-execution-server.git +git checkout scaling + +make depends +make diff --git a/share/app.js b/share/app.js new file mode 100644 index 0000000..e899262 --- /dev/null +++ b/share/app.js @@ -0,0 +1,56 @@ +app = angular.module('TESApp', ['ngRoute']) + +app.controller('JobListController', function($scope, $http) { + "use strict"; + + $scope.url = "/v1/jobs"; + $scope.tasks = []; + + $scope.fetchContent = function() { + $http.get($scope.url).then(function(result){ + $scope.jobs = result.data.jobs; + }); + } + + $scope.fetchContent(); +}); + +app.controller('WorkerListController', function($scope, $http) { + "use strict"; + + $scope.url = "/v1/jobs-service"; + $scope.workers = []; + + $scope.fetchContent = function() { + $http.get($scope.url).then(function(result){ + $scope.workers = result.data; + }); + } + + $scope.fetchContent(); +}); + +app.controller('JobInfoController', + function($scope, $http, $routeParams) { + $scope.url = "/v1/jobs/" + $routeParams.job_id + + $scope.job_info = {}; + $scope.fetchContent = function() { + $http.get($scope.url).then(function(result){ + $scope.job_info = result.data + }) + } + $scope.fetchContent(); + } +); + +app.config(['$routeProvider', + function($routeProvider) { + $routeProvider.when('/', { + templateUrl: 'static/list.html', + }). + when('/jobs/:job_id', { + templateUrl: 'static/jobs.html' + }) + } +]); \ No newline at end of file diff --git a/share/index.html b/share/index.html new file mode 100644 index 0000000..a9c8259 --- /dev/null +++ b/share/index.html @@ -0,0 +1,15 @@ + + + + + + + + +
+

TES

+
+
+
+ + \ No newline at end of file diff --git a/share/jobs.html b/share/jobs.html new file mode 100644 index 0000000..a1c8cc1 --- /dev/null +++ b/share/jobs.html @@ -0,0 +1,36 @@ +
+ <--- +

Job {{job_info.jobId}}

+ +
{{job_info.task.name}}
+
{{job_info.task.description}}
+ +
+
Commands
+
+
Container: {{item.imageName}}
+
Container: {{item.cmd}}
+
+
+ +
Requirements
+
+ {{item}} +
+ +
State: {{job_info.state}}
+ +
+
Logs
+
+
ExitCode
+
{{item.exitCode}}
+ +
STDERR
+
{{item.stderr}}
+ +
STDOUT
+
{{item.stdout}}
+
+
+
\ No newline at end of file diff --git a/share/list.html b/share/list.html new file mode 100644 index 0000000..e9185d7 --- /dev/null +++ b/share/list.html @@ -0,0 +1,12 @@ + +

Tasks

+
+ + + + + + + +
{{item.jobId}}{{item.state}}
+
diff --git a/share/tasks.html b/share/tasks.html new file mode 100644 index 0000000..27de329 --- /dev/null +++ b/share/tasks.html @@ -0,0 +1,41 @@ +
+<--- +

Task {{task_info.id}}

+ +
Command
+
{{task_info.command}}
+ +
Depends
+
{{task_info.task_depends}}
+ +
Container
+
{{task_info.container}}
+ +
Arguments
+
+ {{item}} +
+ +
Tags
+
+ {{item}} +
+ +
Requirements
+
+ {{item}} +
+ +
State
+
{{task_info.state}}
+ +
Max Retries
+
{{task_info.max_retry}}
+ + +
Jobs
+
+ {{item}} +
+ +
\ No newline at end of file diff --git a/src/ga4gh-engine/scaling/mapping.go b/src/ga4gh-engine/scaling/mapping.go deleted file mode 100644 index 194befc..0000000 --- a/src/ga4gh-engine/scaling/mapping.go +++ /dev/null @@ -1,37 +0,0 @@ - -package ga4gh_engine_scaling -import ( - "ga4gh-tasks" - "ga4gh-server/proto" -) - - -type Scaler interface { - JobAdded(*ga4gh_task_exec.Resources) - PingReceived(*ga4gh_task_ref.WorkerInfo) -} - -type ScalerInit func(map[string]string) Scaler - - -var ScalingMethods = map[string]ScalerInit{ - "local" : NewLocalScaler, -} - -type LocalScaler struct { - -} - -func NewLocalScaler(config map[string]string) Scaler { - return LocalScaler{} -} - -func (self LocalScaler) JobAdded(request *ga4gh_task_exec.Resources) { - //check for number of running workers - - //launch new worker if needed and possible -} - -func (self LocalScaler) PingReceived(worker *ga4gh_task_ref.WorkerInfo) { - //do something here -} \ No newline at end of file diff --git a/src/ga4gh-engine/scheduler.go b/src/ga4gh-engine/scheduler.go deleted file mode 100644 index f5a7d36..0000000 --- a/src/ga4gh-engine/scheduler.go +++ /dev/null @@ -1,42 +0,0 @@ - -package ga4gh_taskengine -import ( - "ga4gh-tasks" - "ga4gh-engine/scaling" - "golang.org/x/net/context" - "ga4gh-server/proto" - //"log" -) - -type TaskDB interface { - ga4gh_task_exec.TaskServiceServer - ga4gh_task_ref.SchedulerServer -} - -func Scheduler(task_server TaskDB, scaler ga4gh_engine_scaling.Scaler) *TaskScheduler { - return &TaskScheduler{task_server:task_server, scaler:scaler} -} - -type TaskScheduler struct { - task_server TaskDB - scaler ga4gh_engine_scaling.Scaler -} - -func (self *TaskScheduler) WorkerPing(ctx context.Context, info *ga4gh_task_ref.WorkerInfo) (*ga4gh_task_ref.WorkerInfo, error) { - self.scaler.PingReceived(info) - return info, nil -} - - -func (self *TaskScheduler) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.JobRequest) (*ga4gh_task_ref.JobResponse, error) { - return self.task_server.GetJobToRun(ctx, request) -} - - -func (self *TaskScheduler) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.UpdateStatusRequest) (*ga4gh_task_exec.JobId, error) { - return self.task_server.UpdateJobStatus(ctx, stat) -} - -func (self *TaskScheduler) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest, server ga4gh_task_ref.Scheduler_GetQueueInfoServer) error { - return self.task_server.GetQueueInfo(request, server) -} diff --git a/src/ga4gh-engine/worker/file_client.go b/src/ga4gh-engine/worker/file_client.go deleted file mode 100644 index 892f24b..0000000 --- a/src/ga4gh-engine/worker/file_client.go +++ /dev/null @@ -1,177 +0,0 @@ - - -package ga4gh_taskengine_worker - -import ( - "os" - "strings" - "ga4gh-tasks" - "fmt" - "log" - "io/ioutil" - "path" - "ga4gh-server/proto" - "golang.org/x/net/context" -) - - -type FileMapper interface { - Job(jobId string) - AddVolume(jobId string, source string, mount string) - MapInput(jobId string, storagePath string, localPath string, directory bool) error - MapOutput(jobId string, storagePath string, localPath string, directory bool) error - - HostPath(jobId string, mountPath string) string - - TempFile(jobId string) (f *os.File, err error) - GetBindings(jobId string) []string - UpdateOutputs(jobId string, stepNum int, exit_code int, stdoutText string, stderrText string) - - FinalizeJob(jobId string) -} - -type EngineStatus struct { - JobCount int32 - ActiveJobs int32 -} - - -type FSBinding struct { - HostPath string - ContainerPath string - Mode string -} - - -func NewSharedFS(client *ga4gh_task_ref.SchedulerClient, storageDir string, volumeDir string) *SharedFileMapper { - if _, err := os.Stat(storageDir); os.IsNotExist(err) { - os.Mkdir(storageDir, 0700) - } - if _, err := os.Stat(volumeDir); os.IsNotExist(err) { - os.Mkdir(volumeDir, 0700) - } - - return &SharedFileMapper{StorageDir: storageDir, VolumeDir: volumeDir, jobs: make(map[string]*JobSharedFileMapper), client:client} -} - -type JobSharedFileMapper struct { - JobId string - WorkDir string - Bindings []FSBinding - Outputs []ga4gh_task_exec.TaskParameter -} - -type SharedFileMapper struct { - StorageDir string - VolumeDir string - client *ga4gh_task_ref.SchedulerClient - jobs map[string]*JobSharedFileMapper -} - -func (self *SharedFileMapper) Job(jobId string) { - //create a working 'disk' for runtime files - w := path.Join(self.VolumeDir, jobId) - if _, err := os.Stat(w); err != nil { - os.Mkdir(w, 0700) - } - a := JobSharedFileMapper{JobId:jobId, WorkDir:w} - self.jobs[jobId] = &a -} - -func (self *SharedFileMapper) AddVolume(jobId string, source string, mount string) { - tmpPath, _ := ioutil.TempDir(self.VolumeDir, fmt.Sprintf("job_%s", jobId)) - b := FSBinding { - HostPath: tmpPath, - ContainerPath: mount, - Mode: "rw", - } - j := self.jobs[jobId] - j.Bindings = append(j.Bindings, b) -} - - - -func pathMatch(base string, query string) (string, string) { - if path.Clean(base) == path.Clean(query) { - return query, "" - } - dir, file := path.Split(query) - if len(dir) > 1 { - d, p := pathMatch(base, dir) - return d, path.Join(p, file) - } - return "", "" -} - -func (self *SharedFileMapper) HostPath(jobId string, mountPath string) string { - for _, vol := range self.jobs[jobId].Bindings { - base, relpath := pathMatch(vol.ContainerPath, mountPath) - if len(base) > 0 { - return path.Join(vol.HostPath, relpath) - } - } - return "" -} - -func (self *SharedFileMapper) MapInput(jobId string, storage string, mountPath string, directory bool) error { - storage = strings.TrimPrefix(storage, "fs://") - srcPath := path.Join(self.StorageDir, storage) - if _, err := os.Stat(srcPath); os.IsNotExist(err) { - return fmt.Errorf("storage file '%s' not found", srcPath) - } - - for _, vol := range self.jobs[jobId].Bindings { - base, relpath := pathMatch(vol.ContainerPath, mountPath) - if len(base) > 0 { - fmt.Printf("cp %s %s\n", srcPath, path.Join(vol.HostPath, relpath) ) - copyFileContents(srcPath, path.Join(vol.HostPath, relpath) ) - } - } - return nil -} - -func (self *SharedFileMapper) MapOutput(jobId string, storage string, mountPath string, directory bool) error { - a := ga4gh_task_exec.TaskParameter{Location:storage, Path:mountPath} - j := self.jobs[jobId] - j.Outputs = append(j.Outputs, a) - return nil -} - - -func (self *SharedFileMapper) GetBindings(jobId string) []string { - out := make([]string, 0, 10) - for _, c := range self.jobs[jobId].Bindings { - o := fmt.Sprintf("%s:%s:%s", c.HostPath, c.ContainerPath, c.Mode) - out = append(out, o) - } - return out -} - - -func (self *SharedFileMapper) UpdateOutputs(jobId string, jobNum int, exitCode int, stdoutText string, stderrText string) { - log := ga4gh_task_exec.JobLog{Stdout:stdoutText, Stderr:stderrText, ExitCode:int32(exitCode)} - a := ga4gh_task_ref.UpdateStatusRequest{Id:jobId, Step:int64(jobNum), Log:&log } - (*self.client).UpdateJobStatus(context.Background(), &a) -} - - -func (self *SharedFileMapper) TempFile(jobId string) (f *os.File, err error) { - out, err := ioutil.TempFile(self.jobs[jobId].WorkDir, "ga4ghtask_") - return out, err -} - - - -func (self *SharedFileMapper) FinalizeJob(jobId string) { - for _, out := range self.jobs[jobId].Outputs { - hst := self.HostPath(jobId, out.Path) - storage := strings.TrimPrefix(out.Location, "fs://") - fmt.Printf("copy out %s %s\n", hst, path.Join(self.StorageDir, storage)) - //copy to storage directory - err := CopyFile(hst, path.Join(self.StorageDir, storage)) - if err != nil { - log.Printf("Error copying output %s to %s", hst, out.Location) - } - } -} - diff --git a/src/ga4gh-engine/worker/fork_engine.go b/src/ga4gh-engine/worker/fork_engine.go deleted file mode 100644 index 095f4fc..0000000 --- a/src/ga4gh-engine/worker/fork_engine.go +++ /dev/null @@ -1,105 +0,0 @@ - -package ga4gh_taskengine_worker - -import ( - "time" - "log" - "sync/atomic" - context "golang.org/x/net/context" - "ga4gh-server/proto" - "os" - "ga4gh-tasks" - //proto "github.com/golang/protobuf/proto" -) - - -type ForkManager struct { - procCount int - running bool - files FileMapper - sched ga4gh_task_ref.SchedulerClient - workerId string - ctx context.Context - check_func func(status EngineStatus) - status EngineStatus -} - - - -func (self *ForkManager) worker(inchan chan ga4gh_task_exec.Job) { - for job := range inchan { - atomic.AddInt32(&self.status.ActiveJobs, 1) - atomic.AddInt32(&self.status.JobCount, 1) - log.Printf("Launch job: %s", job) - s := ga4gh_task_exec.State_Running - self.sched.UpdateJobStatus(self.ctx, &ga4gh_task_ref.UpdateStatusRequest{Id:job.JobId, State:s}) - err := RunJob(&job, self.files) - if err != nil { - log.Printf("Job error: %s", err) - self.sched.UpdateJobStatus(self.ctx, &ga4gh_task_ref.UpdateStatusRequest{Id:job.JobId, State:ga4gh_task_exec.State_Error}) - } else { - self.sched.UpdateJobStatus(self.ctx, &ga4gh_task_ref.UpdateStatusRequest{Id:job.JobId, State:ga4gh_task_exec.State_Complete}) - } - atomic.AddInt32(&self.status.ActiveJobs, -1) - } -} - -func (self *ForkManager) watcher(sched ga4gh_task_ref.SchedulerClient, filestore FileMapper) { - self.sched = sched - self.files = filestore - hostname, _ := os.Hostname() - jobchan := make(chan ga4gh_task_exec.Job, 10) - for i := 0; i < self.procCount; i++ { - go self.worker(jobchan) - } - var sleep_size int64 = 1 - for self.running { - if self.check_func != nil { - self.check_func(self.status) - } - task, err := self.sched.GetJobToRun(self.ctx, - &ga4gh_task_ref.JobRequest{ - Worker: &ga4gh_task_ref.WorkerInfo{ - Id:self.workerId, - Hostname:hostname, - LastPing:time.Now().Unix(), - }, - }) - if err != nil { - log.Print(err) - } - if task != nil && task.Job != nil { - sleep_size = 1 - log.Printf("Found job: %s", task) - jobchan <- *task.Job - } else { - //log.Printf("No jobs found") - if (sleep_size < 20) { - // sleep_size += 1 - } - time.Sleep(time.Second * time.Duration(sleep_size)) - } - } - close(jobchan) -} - -func (self *ForkManager) Start(engine ga4gh_task_ref.SchedulerClient, files FileMapper) { - go self.watcher(engine, files) -} - -func (self *ForkManager) Run(engine ga4gh_task_ref.SchedulerClient, files FileMapper) { - self.watcher(engine, files) -} - -func (self *ForkManager) SetStatusCheck( check_func func(status EngineStatus)) { - self.check_func = check_func -} - -func NewLocalManager(procCount int, workerId string) (*ForkManager, error) { - return &ForkManager{ - procCount:procCount, - running:true, - workerId:workerId, - ctx:context.Background(), - }, nil -} \ No newline at end of file diff --git a/src/ga4gh-server/task_interface.go b/src/ga4gh-server/task_interface.go deleted file mode 100644 index 7b0e604..0000000 --- a/src/ga4gh-server/task_interface.go +++ /dev/null @@ -1,48 +0,0 @@ - -package ga4gh_task - -import ( - //"golang.org/x/net/context" - //"ga4gh-tasks" -) - -type TaskImpl struct { -} - - -func NewTaskImpl() *TaskImpl { - return &TaskImpl{} -} - -/* - -func (self *TaskImpl) CreateTask(context.Context, *ga4gh_task_exec.Task) (*ga4gh_task_exec.Task, error) { - -} - -// / Delete a task -func (self *TaskImpl) DeleteTask(context.Context, *ga4gh_task_exec.TaskId) (*ga4gh_task_exec.TaskId, error) { - -} - -// / Get a task by its ID -func (self *TaskImpl) GetTask(context.Context, *ga4gh_task_exec.TaskId) (*ga4gh_task_exec.TaskId, error) { - -} - -// / Run a task -func (self *TaskImpl) RunTask(context.Context, *ga4gh_task_exec.TaskRunRequest) (*ga4gh_task_exec.TaskOpId, error) { - -} - -// / Get info about a running task -func (self *TaskImpl) GetTaskOp(context.Context, *ga4gh_task_exec.TaskOpId) (*ga4gh_task_exec.TaskOpId, error) { - -} - -// / Cancel a running task -func (self *TaskImpl) CancelTaskOp(context.Context, *ga4gh_task_exec.TaskOpId) (*ga4gh_task_exec.TaskOpId, error) { - -} - -*/ \ No newline at end of file diff --git a/src/ga4gh-taskserver/server.go b/src/tes-server/server.go similarity index 65% rename from src/ga4gh-taskserver/server.go rename to src/tes-server/server.go index d605db0..169279c 100644 --- a/src/ga4gh-taskserver/server.go +++ b/src/tes-server/server.go @@ -1,55 +1,49 @@ package main import ( - "os" - "fmt" "flag" - "net/http" - "path/filepath" - "golang.org/x/net/context" + "fmt" + "tes/server" + "tes/ga4gh" "github.com/gorilla/mux" - "google.golang.org/grpc" "github.com/grpc-ecosystem/grpc-gateway/runtime" - "ga4gh-tasks" - "ga4gh-server" - "runtime/debug" + "golang.org/x/net/context" + "google.golang.org/grpc" "log" - "ga4gh-engine" - "ga4gh-engine/scaling" + "net/http" + "os" + "path/filepath" + "runtime/debug" ) - - func main() { http_port := flag.String("port", "8000", "HTTP Port") rpc_port := flag.String("rpc", "9090", "HTTP Port") storage_dir_arg := flag.String("storage", "storage", "Storage Dir") + swift_arg := flag.Bool("swift", false, "Use SWIFT object store") task_db := flag.String("db", "ga4gh_tasks.db", "Task DB File") - scaler_name := flag.String("scaler", "local", "Scaler") flag.Parse() - - dir, _ := filepath.Abs(os.Args[0]) - content_dir := filepath.Join(dir, "..", "..", "share") - - config := map[string]string{} - //get scaler - scaler := ga4gh_engine_scaling.ScalingMethods[*scaler_name](config) + dir, _ := filepath.Abs(os.Args[0]) + content_dir := filepath.Join(dir, "..", "..", "share") //server meta-data storage_dir, _ := filepath.Abs(*storage_dir_arg) - meta_data := map[string]string{ "storageType" : "sharedFile", "baseDir" : storage_dir } + var meta_data = make(map[string]string) + if !*swift_arg { + meta_data["storageType"] = "sharedFile" + meta_data["baseDir"] = storage_dir + } else { + meta_data["storageType"] = "swift" + } //setup GRPC listener - taski := ga4gh_task.NewTaskBolt(*task_db, meta_data) //ga4gh_task.NewTaskImpl() - - //setup scheduler - scheduler := ga4gh_taskengine.Scheduler(taski, scaler) + taski := tes_server.NewTaskBolt(*task_db, meta_data) - server := ga4gh_task.NewGA4GHServer() + server := tes_server.NewGA4GHServer() server.RegisterTaskServer(taski) - server.RegisterScheduleServer(scheduler) + server.RegisterScheduleServer(taski) server.Start(*rpc_port) //setup RESTful proxy @@ -58,9 +52,9 @@ func main() { ctx, cancel := context.WithCancel(ctx) defer cancel() opts := []grpc.DialOption{grpc.WithInsecure()} - log.Println("Proxy connecting to localhost:" + *rpc_port ) - err := ga4gh_task_exec.RegisterTaskServiceHandlerFromEndpoint(ctx, grpc_mux, "localhost:" + *rpc_port, opts) - if (err != nil) { + log.Println("Proxy connecting to localhost:" + *rpc_port) + err := ga4gh_task_exec.RegisterTaskServiceHandlerFromEndpoint(ctx, grpc_mux, "localhost:"+*rpc_port, opts) + if err != nil { fmt.Println("Register Error", err) } @@ -74,12 +68,12 @@ func main() { } // Routes consist of a path and a handler function r.HandleFunc("/", - func (w http.ResponseWriter, r *http.Request) { + func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, filepath.Join(content_dir, "index.html")) }) r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.Dir(content_dir)))) r.PathPrefix("/v1/").Handler(grpc_mux) log.Printf("Listening on port: %s\n", *http_port) - http.ListenAndServe(":" + *http_port, r) -} \ No newline at end of file + http.ListenAndServe(":"+*http_port, r) +} diff --git a/src/ga4gh-worker/worker.go b/src/tes-worker/worker.go similarity index 52% rename from src/ga4gh-worker/worker.go rename to src/tes-worker/worker.go index 6460748..901c803 100644 --- a/src/ga4gh-worker/worker.go +++ b/src/tes-worker/worker.go @@ -1,23 +1,22 @@ - package main import ( - "os" - "log" "flag" - "ga4gh-engine/worker" + "tes/worker" + "tes/server/proto" + uuid "github.com/nu7hatch/gouuid" "google.golang.org/grpc" + "log" + "os" "path/filepath" - uuid "github.com/nu7hatch/gouuid" "time" - "ga4gh-server/proto" ) - func main() { agro_server := flag.String("master", "localhost:9090", "Master Server") volume_dir_arg := flag.String("volumes", "volumes", "Volume Dir") storage_dir_arg := flag.String("storage", "storage", "Storage Dir") + swift_dir_arg := flag.String("swift", "", "Cache Swift items in directory") timeout_arg := flag.Int("timeout", -1, "Timeout in seconds") nworker := flag.Int("nworkers", 4, "Worker Count") @@ -26,10 +25,7 @@ func main() { if _, err := os.Stat(volume_dir); os.IsNotExist(err) { os.Mkdir(volume_dir, 0700) } - storage_dir, _ := filepath.Abs(*storage_dir_arg) - if _, err := os.Stat(storage_dir); os.IsNotExist(err) { - os.Mkdir(storage_dir, 0700) - } + log.Println("Connecting GA4GH Task Server") conn, err := grpc.Dial(*agro_server, grpc.WithInsecure()) if err != nil { @@ -38,25 +34,41 @@ func main() { defer conn.Close() sched_client := ga4gh_task_ref.NewSchedulerClient(conn) - file_client := ga4gh_taskengine_worker.NewSharedFS(&sched_client, storage_dir, volume_dir) + var file_client tes_taskengine_worker.FileSystemAccess = nil + + if *swift_dir_arg != "" { + storage_dir, _ := filepath.Abs(*swift_dir_arg) + if _, err := os.Stat(storage_dir); os.IsNotExist(err) { + os.Mkdir(storage_dir, 0700) + } + + file_client = tes_taskengine_worker.NewSwiftAccess() + } else { + storage_dir, _ := filepath.Abs(*storage_dir_arg) + if _, err := os.Stat(storage_dir); os.IsNotExist(err) { + os.Mkdir(storage_dir, 0700) + } + file_client = tes_taskengine_worker.NewSharedFS(storage_dir) + } + file_mapper := tes_taskengine_worker.NewFileMapper(&sched_client, file_client, volume_dir) u, _ := uuid.NewV4() - manager, _ := ga4gh_taskengine_worker.NewLocalManager(*nworker, u.String()) + manager, _ := tes_taskengine_worker.NewLocalManager(*nworker, u.String()) if *timeout_arg <= 0 { - manager.Run(sched_client, file_client) + manager.Run(sched_client, *file_mapper) } else { var start_count int32 = 0 last_ping := time.Now().Unix() - manager.SetStatusCheck( func(status ga4gh_taskengine_worker.EngineStatus) { + manager.SetStatusCheck(func(status tes_taskengine_worker.EngineStatus) { if status.JobCount > start_count || status.ActiveJobs > 0 { start_count = status.JobCount last_ping = time.Now().Unix() } - } ) - manager.Start(sched_client, file_client) - for time.Now().Unix() - last_ping < int64(*timeout_arg) { + }) + manager.Start(sched_client, *file_mapper) + for time.Now().Unix()-last_ping < int64(*timeout_arg) { time.Sleep(5 * time.Second) } } -} \ No newline at end of file +} diff --git a/src/ga4gh-tasks/task_execution.pb.go b/src/tes/ga4gh/task_execution.pb.go similarity index 77% rename from src/ga4gh-tasks/task_execution.pb.go rename to src/tes/ga4gh/task_execution.pb.go index 06f7366..9fe8bb3 100644 --- a/src/ga4gh-tasks/task_execution.pb.go +++ b/src/tes/ga4gh/task_execution.pb.go @@ -100,9 +100,15 @@ type TaskParameter struct { // path in the machine file system. Note, this MUST be a path that exists // within one of the defined volumes Path string `protobuf:"bytes,4,opt,name=path" json:"path,omitempty"` - // data is a directory, if used for an output all the files in the directory + // Type of data, "File" or "Directory" + // if used for an output all the files in the directory // will be copied to the storage location - Directory bool `protobuf:"varint,5,opt,name=directory" json:"directory,omitempty"` + Class string `protobuf:"bytes,5,opt,name=class" json:"class,omitempty"` + // if the parameter is an output, should the element be created before executing + // the command. For example if saving the working directory as an output, + // the directory needs to exist before the command is called. If false, it is + // assumed that the user will create the element as a part of the operation + Create bool `protobuf:"varint,6,opt,name=create" json:"create,omitempty"` } func (m *TaskParameter) Reset() { *m = TaskParameter{} } @@ -116,10 +122,12 @@ type DockerExecutor struct { ImageName string `protobuf:"bytes,1,opt,name=imageName" json:"imageName,omitempty"` // The command to be executed Cmd []string `protobuf:"bytes,2,rep,name=cmd" json:"cmd,omitempty"` + // The working directory that the command will be executed in + Workdir string `protobuf:"bytes,3,opt,name=workdir" json:"workdir,omitempty"` // Path for stdout recording, blank if not storing to file - Stdout string `protobuf:"bytes,3,opt,name=stdout" json:"stdout,omitempty"` + Stdout string `protobuf:"bytes,4,opt,name=stdout" json:"stdout,omitempty"` // Path for stderr recording, blank if not storing to file - Stderr string `protobuf:"bytes,4,opt,name=stderr" json:"stderr,omitempty"` + Stderr string `protobuf:"bytes,5,opt,name=stderr" json:"stderr,omitempty"` } func (m *DockerExecutor) Reset() { *m = DockerExecutor{} } @@ -312,7 +320,7 @@ func (*JobId) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } type JobLog struct { // The command line that was run - CommandLine string `protobuf:"bytes,1,opt,name=commandLine" json:"commandLine,omitempty"` + Cmd []string `protobuf:"bytes,1,rep,name=cmd" json:"cmd,omitempty"` // When the command was executed StartTime string `protobuf:"bytes,2,opt,name=startTime" json:"startTime,omitempty"` // When the command completed @@ -632,67 +640,67 @@ var _TaskService_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("task_execution.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 979 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0x4d, 0x6f, 0x23, 0x45, - 0x10, 0xc5, 0x1f, 0x33, 0xb6, 0xcb, 0xeb, 0x0f, 0x7a, 0xb3, 0x89, 0x65, 0xad, 0xd8, 0xd5, 0x2c, - 0x48, 0x51, 0xa4, 0xb5, 0x85, 0xc9, 0x01, 0xed, 0xd5, 0x84, 0x55, 0x56, 0x59, 0x30, 0x49, 0x96, - 0x13, 0x28, 0x1a, 0xcf, 0x54, 0xbc, 0x13, 0x7b, 0xba, 0x67, 0x7b, 0x7a, 0x42, 0xb2, 0x88, 0x0b, - 0x27, 0x24, 0xc4, 0x89, 0x9f, 0x86, 0xf8, 0x07, 0x9c, 0xf9, 0x0d, 0x54, 0xf7, 0x8c, 0x3f, 0xe2, - 0xb1, 0x03, 0x17, 0x6e, 0x9e, 0xee, 0x57, 0x55, 0xaf, 0x5e, 0xbd, 0x6a, 0x19, 0x76, 0x94, 0x1b, - 0x4f, 0x2f, 0xf0, 0x06, 0xbd, 0x44, 0x05, 0x82, 0xf7, 0x22, 0x29, 0x94, 0x60, 0xad, 0x89, 0x7b, - 0x38, 0x79, 0x7b, 0xb1, 0xb8, 0xeb, 0x3e, 0x9e, 0x08, 0x31, 0x99, 0x61, 0xdf, 0x8d, 0x82, 0xbe, - 0xcb, 0xb9, 0x50, 0xae, 0x46, 0xc7, 0x29, 0xdc, 0x41, 0x68, 0x9c, 0x13, 0x74, 0xe4, 0x4a, 0x37, - 0x44, 0x85, 0x92, 0x3d, 0x80, 0x32, 0xa7, 0x9f, 0x9d, 0xc2, 0xd3, 0xc2, 0x7e, 0x8d, 0x3d, 0x84, - 0xba, 0x8f, 0xb1, 0x27, 0x83, 0x48, 0x07, 0x75, 0x8a, 0xe6, 0xb0, 0x0d, 0xd5, 0x99, 0xf0, 0x4c, - 0x9a, 0x4e, 0xc9, 0x9c, 0x50, 0x50, 0xe4, 0xaa, 0xb7, 0x9d, 0xb2, 0xf9, 0xfa, 0x10, 0x6a, 0x7e, - 0x20, 0xd1, 0x53, 0x42, 0xde, 0x76, 0x2c, 0x3a, 0xaa, 0x3a, 0x23, 0x68, 0x7e, 0x21, 0xbc, 0x29, - 0xca, 0x23, 0x43, 0x57, 0x48, 0x0d, 0x0a, 0x42, 0x77, 0x82, 0x5f, 0x2d, 0x8b, 0xd5, 0xa1, 0xe4, - 0x85, 0x3e, 0x15, 0x29, 0xd1, 0x47, 0x13, 0xec, 0x58, 0xf9, 0x22, 0x51, 0x59, 0x89, 0xf4, 0x1b, - 0xa5, 0x4c, 0x8b, 0x38, 0xaf, 0xc0, 0xfe, 0x56, 0xcc, 0x92, 0x10, 0xd7, 0x18, 0x6b, 0x5c, 0xf0, - 0x1e, 0x5f, 0x8e, 0x0d, 0xd9, 0x86, 0xf9, 0x16, 0x89, 0xf4, 0x30, 0xcb, 0xc3, 0x00, 0x42, 0x91, - 0x70, 0x35, 0x12, 0x01, 0x57, 0x1d, 0xdb, 0xe4, 0xfa, 0xa5, 0x00, 0xb5, 0x53, 0x4c, 0x61, 0x31, - 0xdb, 0x83, 0x56, 0x18, 0xf0, 0x20, 0x4c, 0xc2, 0x61, 0x94, 0x0c, 0x85, 0xc4, 0xd8, 0xa4, 0x6e, - 0x68, 0x31, 0x22, 0x89, 0x18, 0x92, 0x16, 0xe3, 0x19, 0x9a, 0xfc, 0x55, 0xb6, 0x03, 0x0f, 0x32, - 0xf4, 0xa9, 0x1b, 0x52, 0xd5, 0x92, 0x81, 0xee, 0x43, 0xe5, 0xda, 0xb0, 0x8b, 0x89, 0x6e, 0x69, - 0xbf, 0x3e, 0xd8, 0xeb, 0xad, 0xcd, 0xa5, 0x97, 0xb1, 0x6f, 0x80, 0xf5, 0x5e, 0x70, 0xc2, 0x59, - 0xba, 0x6d, 0xe7, 0xb7, 0x22, 0x94, 0xf5, 0x40, 0xd6, 0xba, 0x22, 0xb5, 0x68, 0x5e, 0x57, 0xa4, - 0xe9, 0xb1, 0x9f, 0x4d, 0x61, 0x6d, 0x34, 0x69, 0x77, 0x3d, 0xb0, 0x03, 0x1e, 0x25, 0x6a, 0x5e, - 0xf6, 0xa3, 0x5c, 0xd9, 0xbb, 0xd3, 0xee, 0x43, 0x85, 0x24, 0x36, 0x01, 0xd6, 0x7f, 0x0a, 0x78, - 0x0e, 0x35, 0x39, 0x57, 0xca, 0xa8, 0x57, 0x1f, 0x74, 0x73, 0x21, 0x4b, 0x2d, 0x49, 0x7d, 0x7d, - 0x4c, 0xa4, 0x2b, 0x86, 0x5f, 0x1f, 0x6c, 0xdf, 0xf8, 0xa0, 0x53, 0x35, 0xe5, 0x9e, 0xe4, 0x62, - 0xef, 0xda, 0xc4, 0xf9, 0x1e, 0x5a, 0x9a, 0xc0, 0x49, 0x10, 0xab, 0x53, 0x7c, 0x97, 0x60, 0xac, - 0xee, 0x6a, 0x51, 0x98, 0x0f, 0x55, 0x8b, 0x35, 0x92, 0x78, 0x19, 0xdc, 0x2c, 0x5d, 0x1a, 0x91, - 0xbf, 0xce, 0xc8, 0x0c, 0xd9, 0x50, 0x74, 0x20, 0x9d, 0x9c, 0x8b, 0x29, 0xf2, 0xcc, 0x45, 0x5f, - 0x43, 0x7b, 0x99, 0x3e, 0x8e, 0x68, 0x2f, 0x90, 0x7d, 0x0c, 0x96, 0xa6, 0xa3, 0xa7, 0xae, 0x29, - 0x3e, 0xda, 0xa8, 0x08, 0x7b, 0x04, 0x0d, 0x8e, 0x37, 0x6a, 0xb4, 0x48, 0x68, 0xaa, 0x3a, 0xdf, - 0x41, 0xf3, 0x95, 0x18, 0xff, 0x5f, 0x74, 0x4f, 0xa0, 0xb5, 0xc8, 0x9e, 0xb1, 0x75, 0xa0, 0x7c, - 0x25, 0xc6, 0x73, 0xb2, 0x3b, 0x39, 0xb2, 0x84, 0xdf, 0xc6, 0x75, 0x0f, 0xec, 0x73, 0x33, 0x1c, - 0x6d, 0xc2, 0x6b, 0x77, 0x96, 0x64, 0x6e, 0x73, 0x76, 0xc1, 0xa2, 0xb0, 0xfc, 0x79, 0x0c, 0xb6, - 0x2e, 0x2f, 0x26, 0xda, 0x7c, 0x9e, 0x08, 0x43, 0x97, 0xfb, 0x27, 0x01, 0x5f, 0x31, 0x69, 0xac, - 0x5c, 0xa9, 0xce, 0x83, 0x10, 0xb3, 0xae, 0x5a, 0x50, 0x41, 0xee, 0x9b, 0x83, 0xd5, 0x35, 0xd6, - 0x6b, 0x5d, 0x5e, 0x5b, 0x6b, 0x6b, 0x2e, 0x03, 0xde, 0x04, 0x6a, 0x28, 0x7c, 0x34, 0xf6, 0xb2, - 0x9c, 0xbf, 0x0b, 0x50, 0xd2, 0x4d, 0x10, 0x97, 0x2b, 0x4d, 0x2a, 0x2b, 0x76, 0x08, 0x55, 0x72, - 0xa4, 0xeb, 0xbb, 0xca, 0x35, 0x2f, 0x46, 0x7d, 0xe0, 0x6c, 0xea, 0xbd, 0xf7, 0x3a, 0x03, 0x1d, - 0x71, 0x25, 0x6f, 0xd9, 0x33, 0x28, 0xeb, 0x6b, 0x43, 0x66, 0xeb, 0x68, 0x3f, 0x01, 0x8b, 0xfa, - 0x50, 0x68, 0x28, 0x36, 0x07, 0xbb, 0x39, 0xd4, 0x99, 0xbe, 0x25, 0x58, 0x79, 0x26, 0x26, 0xf3, - 0xc5, 0xd9, 0xdb, 0x54, 0x9d, 0xa4, 0xea, 0xf6, 0xa1, 0x71, 0x97, 0x03, 0x3d, 0x73, 0x53, 0xbc, - 0xcd, 0xda, 0x58, 0x28, 0x6c, 0xf4, 0x7a, 0x51, 0xfc, 0xbc, 0xe0, 0xec, 0x00, 0x3b, 0x43, 0x79, - 0x1d, 0x78, 0x78, 0xcc, 0x2f, 0x45, 0x66, 0x23, 0xe7, 0xd7, 0x02, 0xd4, 0x57, 0x8e, 0xd9, 0x97, - 0xd0, 0x88, 0x69, 0x41, 0x68, 0xa4, 0x43, 0xc1, 0x2f, 0x83, 0x49, 0x66, 0x80, 0x7e, 0x9e, 0xec, - 0x32, 0x88, 0x88, 0xaf, 0x44, 0x18, 0x36, 0xdd, 0x43, 0xaa, 0x96, 0x3b, 0xfd, 0x37, 0x8e, 0x07, - 0xef, 0xc0, 0x4a, 0x45, 0xa8, 0x43, 0xe5, 0x0d, 0x9f, 0x72, 0xf1, 0x03, 0x6f, 0x7f, 0x40, 0xb6, - 0xb6, 0xbf, 0x49, 0x30, 0x41, 0xbf, 0x5d, 0xd0, 0x17, 0xa7, 0x09, 0xe7, 0x01, 0x9f, 0xb4, 0x8b, - 0xfa, 0x62, 0xe4, 0x26, 0x31, 0x5d, 0x94, 0xe8, 0x61, 0xab, 0x0e, 0x45, 0x18, 0xcd, 0xe8, 0x39, - 0x69, 0x97, 0x59, 0x0d, 0xac, 0x23, 0x29, 0x85, 0x6c, 0x5b, 0xe4, 0x95, 0xfa, 0xd9, 0x6d, 0xac, - 0x30, 0x4c, 0x0f, 0x6c, 0x83, 0x74, 0xb9, 0x87, 0x33, 0x8a, 0xab, 0x0c, 0xfe, 0x2c, 0x41, 0x5d, - 0x8f, 0x27, 0xeb, 0x87, 0x85, 0xd0, 0x7c, 0x89, 0x6a, 0x55, 0x92, 0x67, 0xf7, 0xf5, 0x9e, 0xe9, - 0xd8, 0x7d, 0x7c, 0x1f, 0xc8, 0xe9, 0xfc, 0xfc, 0xc7, 0x5f, 0xbf, 0x17, 0x19, 0x6b, 0xf7, 0xaf, - 0x3f, 0xed, 0xeb, 0xbd, 0x7a, 0x1e, 0x67, 0xe5, 0x5e, 0x9b, 0x7e, 0xd2, 0xd5, 0xdf, 0x68, 0x9b, - 0xee, 0xee, 0x26, 0x07, 0x1c, 0xfb, 0xce, 0x43, 0x93, 0xb3, 0xe1, 0x54, 0xe7, 0x39, 0x5f, 0x14, - 0x0e, 0xd8, 0x05, 0x54, 0xf5, 0x1a, 0x13, 0x22, 0x66, 0x4f, 0x36, 0x5a, 0x67, 0xf9, 0x84, 0x74, - 0x9f, 0x6e, 0x07, 0xa4, 0xaf, 0x80, 0xd3, 0x36, 0x35, 0x80, 0x2d, 0x6a, 0xb0, 0x11, 0xd8, 0x24, - 0x8f, 0x5e, 0x9c, 0x2d, 0xbc, 0xba, 0x1b, 0xdf, 0x8a, 0xbc, 0x02, 0xfd, 0x1f, 0xcd, 0xf0, 0x7f, - 0x62, 0x6f, 0xa0, 0x96, 0x8e, 0xe3, 0xbe, 0xa4, 0xdb, 0x44, 0xc8, 0xd2, 0x1e, 0xe4, 0xd2, 0x8e, - 0x6d, 0xf3, 0x47, 0xe4, 0xb3, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x8f, 0x49, 0x18, 0x5b, 0xcf, - 0x08, 0x00, 0x00, + // 986 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0x4f, 0x6f, 0x1b, 0x45, + 0x14, 0x67, 0x6d, 0xef, 0xda, 0x7e, 0xae, 0xff, 0x30, 0x4d, 0x1b, 0xcb, 0xaa, 0x68, 0xb5, 0x05, + 0x29, 0x8a, 0x54, 0x5b, 0x98, 0x1c, 0x50, 0xaf, 0x26, 0x54, 0xa9, 0x5a, 0x30, 0x49, 0xca, 0x09, + 0x14, 0x8d, 0x77, 0x27, 0xee, 0xc6, 0xde, 0x99, 0xcd, 0xec, 0x6c, 0x9a, 0x14, 0x71, 0xe1, 0x84, + 0x84, 0x38, 0xf1, 0xd1, 0x10, 0xdf, 0x80, 0x33, 0x9f, 0x81, 0x37, 0xb3, 0x63, 0x3b, 0xc9, 0xda, + 0xa1, 0x17, 0x6e, 0x3b, 0x6f, 0xde, 0x9f, 0xdf, 0xfb, 0xbd, 0xdf, 0x1b, 0x2d, 0x6c, 0x29, 0x9a, + 0xce, 0x4e, 0xd8, 0x25, 0x0b, 0x32, 0x15, 0x09, 0xde, 0x4f, 0xa4, 0x50, 0x82, 0xb4, 0xa7, 0x74, + 0x6f, 0xfa, 0xf6, 0x64, 0x79, 0xd7, 0x7b, 0x34, 0x15, 0x62, 0x3a, 0x67, 0x03, 0x9a, 0x44, 0x03, + 0xca, 0xb9, 0x50, 0x54, 0x7b, 0xa7, 0xb9, 0xbb, 0x7f, 0x0e, 0xcd, 0x63, 0x74, 0x1d, 0x53, 0x49, + 0x63, 0xa6, 0x98, 0x24, 0xf7, 0xa0, 0xc2, 0xf1, 0xb3, 0xeb, 0x3c, 0x71, 0x76, 0xea, 0xe4, 0x3e, + 0x34, 0x42, 0x96, 0x06, 0x32, 0x4a, 0x74, 0x50, 0xb7, 0x64, 0x8c, 0x1d, 0xa8, 0xcd, 0x45, 0x60, + 0xd2, 0x74, 0xcb, 0xc6, 0x82, 0x41, 0x09, 0x55, 0x6f, 0xbb, 0x15, 0x73, 0x6a, 0x82, 0x1b, 0xcc, + 0x69, 0x9a, 0x76, 0x5d, 0x73, 0x6c, 0x81, 0x17, 0x48, 0x46, 0x15, 0xeb, 0x7a, 0x78, 0xae, 0xf9, + 0x14, 0x5a, 0x5f, 0x89, 0x60, 0xc6, 0xe4, 0xbe, 0x81, 0x2e, 0x24, 0xf9, 0x18, 0xea, 0x51, 0x4c, + 0xa7, 0xec, 0x9b, 0x55, 0xe1, 0x06, 0x94, 0x83, 0x38, 0xc4, 0x82, 0x65, 0x3c, 0xb4, 0xa1, 0xfa, + 0x4e, 0xc8, 0x59, 0x18, 0x49, 0x5b, 0x0f, 0x53, 0xa6, 0x2a, 0x14, 0x99, 0xb2, 0x15, 0xf3, 0x33, + 0x93, 0x32, 0x2f, 0xe9, 0xbf, 0x04, 0xef, 0x7b, 0x31, 0xcf, 0x62, 0x76, 0xab, 0x1d, 0xed, 0x17, + 0xbd, 0x67, 0x2f, 0x26, 0xa6, 0x93, 0xa6, 0x39, 0x8b, 0x4c, 0x06, 0xcc, 0xe6, 0x25, 0x00, 0xb1, + 0xc8, 0xb8, 0x1a, 0x8b, 0x88, 0x2b, 0x03, 0xb7, 0xee, 0xff, 0xea, 0x40, 0xfd, 0x90, 0xe5, 0x6e, + 0x29, 0xd9, 0x86, 0x76, 0x1c, 0xf1, 0x28, 0xce, 0xe2, 0x51, 0x92, 0x8d, 0x84, 0x64, 0xa9, 0x49, + 0xdd, 0xd4, 0x4c, 0x25, 0x92, 0xb1, 0x18, 0x89, 0x9a, 0xcc, 0x99, 0xc9, 0x5f, 0x23, 0x5b, 0x70, + 0xcf, 0x7a, 0x1f, 0xd2, 0x18, 0xab, 0x96, 0x8d, 0xeb, 0x0e, 0x54, 0x2f, 0x0c, 0xba, 0x14, 0xe1, + 0x97, 0x77, 0x1a, 0xc3, 0xed, 0xfe, 0xad, 0xa1, 0xf5, 0x2d, 0x7a, 0x64, 0xf2, 0xbd, 0xe0, 0x4c, + 0x33, 0x89, 0x3c, 0xf8, 0xbf, 0x97, 0xa0, 0xa2, 0xa7, 0x75, 0xab, 0x2b, 0xa4, 0x0f, 0x87, 0x79, + 0xc6, 0x02, 0x75, 0x10, 0xda, 0x11, 0xdd, 0x9a, 0x5b, 0xde, 0x5d, 0x1f, 0xbc, 0x88, 0x27, 0x99, + 0x5a, 0x94, 0xfd, 0xa4, 0x50, 0xf6, 0xa6, 0x14, 0x06, 0x50, 0x45, 0x8a, 0x4d, 0x80, 0xfb, 0x41, + 0x01, 0xcf, 0xa0, 0x2e, 0x17, 0x4c, 0x19, 0xf6, 0x1a, 0xc3, 0x5e, 0x21, 0x64, 0xc5, 0x25, 0xb2, + 0xaf, 0xcd, 0x08, 0xba, 0x6a, 0xf0, 0x0d, 0xc0, 0x0b, 0x8d, 0x30, 0xba, 0x35, 0x53, 0xee, 0x71, + 0x21, 0xf6, 0xa6, 0x6e, 0xfc, 0x1f, 0xa1, 0xad, 0x01, 0xbc, 0x8a, 0x52, 0x75, 0xc8, 0xce, 0x33, + 0x96, 0xaa, 0x9b, 0x5c, 0x38, 0x8b, 0xa1, 0x6a, 0xb2, 0xc6, 0x92, 0x9d, 0x46, 0x97, 0x2b, 0x09, + 0x27, 0x28, 0xb8, 0x23, 0x14, 0x83, 0x1d, 0x8a, 0x0e, 0x44, 0xcb, 0xb1, 0x98, 0x31, 0x9e, 0xab, + 0xca, 0xff, 0x16, 0x3a, 0xab, 0xf4, 0x69, 0x82, 0x4b, 0xc3, 0xc8, 0xa7, 0xe0, 0x6a, 0x38, 0x7a, + 0xea, 0x1a, 0xe2, 0x83, 0xb5, 0x8c, 0x90, 0x07, 0xd0, 0xe4, 0xec, 0x52, 0x8d, 0x97, 0x09, 0x4d, + 0x55, 0xff, 0x07, 0x68, 0xbd, 0x14, 0x93, 0xff, 0x0b, 0xee, 0x2b, 0x68, 0x2f, 0xb3, 0x5b, 0xb4, + 0x3e, 0x54, 0xce, 0xc4, 0x64, 0x01, 0x76, 0xab, 0x00, 0x16, 0xfd, 0x37, 0x61, 0xdd, 0x06, 0xef, + 0xd8, 0x0c, 0x47, 0x8b, 0xf0, 0x82, 0xce, 0x33, 0xab, 0x36, 0xff, 0x21, 0xb8, 0x18, 0x56, 0xb4, + 0xcf, 0xc0, 0xd3, 0xe5, 0xc5, 0x74, 0xb1, 0xbb, 0x8e, 0xd9, 0x5d, 0x04, 0x9a, 0x2a, 0x2a, 0xd5, + 0x71, 0x14, 0x33, 0xdb, 0x0d, 0xae, 0x33, 0xe3, 0xa1, 0x31, 0x7c, 0xd0, 0x3a, 0xeb, 0xf6, 0xd9, + 0x65, 0xa4, 0x46, 0x22, 0xcc, 0xdf, 0x10, 0xd7, 0xff, 0xc7, 0x81, 0xb2, 0x06, 0x8f, 0x18, 0xce, + 0x34, 0x18, 0xcb, 0xdd, 0x1e, 0xd4, 0x50, 0x89, 0x34, 0xa4, 0x8a, 0x9a, 0xa7, 0xa3, 0x31, 0xf4, + 0xd7, 0xf5, 0xdc, 0x7f, 0x6d, 0x9d, 0xf6, 0xb9, 0x92, 0x57, 0xe4, 0x29, 0x54, 0xf4, 0xb5, 0x01, + 0xb3, 0x71, 0xa4, 0x9f, 0x81, 0x8b, 0x7d, 0xe0, 0x23, 0xa6, 0x21, 0xb6, 0x86, 0x0f, 0x0b, 0x5e, + 0x47, 0xfa, 0x16, 0xdd, 0x2a, 0x73, 0x31, 0x5d, 0x2c, 0xcc, 0xf6, 0xba, 0xea, 0x48, 0x51, 0x6f, + 0x00, 0xcd, 0x9b, 0x18, 0x90, 0xb3, 0x19, 0xbb, 0xb2, 0x6d, 0x2c, 0x99, 0x35, 0x7c, 0x3d, 0x2f, + 0x7d, 0xe9, 0xf8, 0x5b, 0x40, 0x8e, 0x98, 0xbc, 0x88, 0x02, 0x76, 0xc0, 0x4f, 0x85, 0x95, 0x8f, + 0xff, 0x9b, 0x03, 0x8d, 0x6b, 0x66, 0xf2, 0x35, 0x34, 0x53, 0x5c, 0x0c, 0x1c, 0xe5, 0x48, 0xf0, + 0xd3, 0x68, 0x6a, 0x07, 0x3f, 0x28, 0x82, 0x5d, 0x05, 0x21, 0xf0, 0x6b, 0x11, 0x06, 0x4d, 0x6f, + 0x0f, 0xab, 0x15, 0xac, 0xff, 0x85, 0x71, 0xf7, 0x1c, 0xdc, 0x9c, 0x84, 0x06, 0x54, 0xdf, 0xf0, + 0x19, 0x17, 0xef, 0x78, 0xe7, 0x23, 0x94, 0xb3, 0xf7, 0x5d, 0xc6, 0x32, 0x16, 0x76, 0x1c, 0x7d, + 0x71, 0x98, 0x71, 0x1e, 0xf1, 0x69, 0xa7, 0xa4, 0x2f, 0xc6, 0x34, 0x4b, 0xf1, 0xa2, 0x8c, 0x0f, + 0x5a, 0x6d, 0x24, 0xe2, 0x64, 0x8e, 0xcf, 0x48, 0xa7, 0x42, 0xea, 0xe0, 0xee, 0x4b, 0x29, 0x64, + 0xc7, 0x45, 0xad, 0x34, 0x8e, 0xae, 0x52, 0xc5, 0xe2, 0xdc, 0xe0, 0x19, 0x4f, 0xca, 0x03, 0x36, + 0xc7, 0xb8, 0xea, 0xf0, 0xaf, 0x32, 0x34, 0xf4, 0x78, 0x6c, 0x3f, 0x24, 0x86, 0xd6, 0x0b, 0xa6, + 0xae, 0x53, 0xf2, 0xf4, 0xae, 0xde, 0x2d, 0x8f, 0xbd, 0x47, 0x77, 0x39, 0xf9, 0xdd, 0x5f, 0xfe, + 0xfc, 0xfb, 0x8f, 0x12, 0x21, 0x9d, 0xc1, 0xc5, 0xe7, 0x03, 0xbd, 0x4f, 0xcf, 0x52, 0x5b, 0xee, + 0xb5, 0xe9, 0x27, 0x5f, 0xf9, 0xb5, 0xb2, 0xe9, 0x3d, 0x5c, 0xa7, 0x80, 0x83, 0xd0, 0xbf, 0x6f, + 0x72, 0x36, 0xfd, 0xda, 0x22, 0xe7, 0x73, 0x67, 0x97, 0x9c, 0x40, 0x4d, 0xaf, 0x2f, 0x7a, 0xa4, + 0xe4, 0xf1, 0x5a, 0xe9, 0xac, 0x9e, 0x8e, 0xde, 0x93, 0xcd, 0x0e, 0xf9, 0xf6, 0xfb, 0x1d, 0x53, + 0x03, 0xc8, 0xb2, 0x06, 0x19, 0x83, 0x87, 0xf4, 0xe8, 0xc5, 0xd9, 0x80, 0xab, 0xb7, 0xf6, 0x8d, + 0x28, 0x32, 0x30, 0xf8, 0xc9, 0x0c, 0xff, 0x67, 0xf2, 0x06, 0xea, 0xf9, 0x38, 0xee, 0x4a, 0xba, + 0x89, 0x04, 0x9b, 0x76, 0xb7, 0x90, 0x76, 0xe2, 0x99, 0xbf, 0x93, 0x2f, 0xfe, 0x0d, 0x00, 0x00, + 0xff, 0xff, 0x7d, 0xdb, 0x49, 0xba, 0xe4, 0x08, 0x00, 0x00, } diff --git a/src/ga4gh-tasks/task_execution.pb.gw.go b/src/tes/ga4gh/task_execution.pb.gw.go similarity index 100% rename from src/ga4gh-tasks/task_execution.pb.gw.go rename to src/tes/ga4gh/task_execution.pb.gw.go diff --git a/src/ga4gh-server/proto/task_worker.pb.go b/src/tes/server/proto/task_worker.pb.go similarity index 99% rename from src/ga4gh-server/proto/task_worker.pb.go rename to src/tes/server/proto/task_worker.pb.go index 14c7296..4ae0f76 100644 --- a/src/ga4gh-server/proto/task_worker.pb.go +++ b/src/tes/server/proto/task_worker.pb.go @@ -21,7 +21,7 @@ package ga4gh_task_ref import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" -import ga4gh_task_exec "ga4gh-tasks" +import ga4gh_task_exec "tes/ga4gh" import ( context "golang.org/x/net/context" diff --git a/src/ga4gh-server/task_boltdb.go b/src/tes/server/task_boltdb.go similarity index 87% rename from src/ga4gh-server/task_boltdb.go rename to src/tes/server/task_boltdb.go index 417662c..61bab13 100644 --- a/src/ga4gh-server/task_boltdb.go +++ b/src/tes/server/task_boltdb.go @@ -1,19 +1,17 @@ - -package ga4gh_task +package tes_server import ( - "golang.org/x/net/context" - "ga4gh-tasks" + "fmt" + "tes/server/proto" + "tes/ga4gh" "github.com/boltdb/bolt" - uuid "github.com/nu7hatch/gouuid" proto "github.com/golang/protobuf/proto" - "fmt" + uuid "github.com/nu7hatch/gouuid" + "golang.org/x/net/context" "log" "strings" - "ga4gh-server/proto" ) - var TASK_BUCKET = []byte("tasks") var JOBS_QUEUED = []byte("jobs-queued") @@ -23,36 +21,34 @@ var JOBS_COMPLETE = []byte("jobs-complete") var JOBS_LOG = []byte("jobs-log") type TaskBolt struct { - db *bolt.DB + db *bolt.DB storage_metadata map[string]string } - func NewTaskBolt(path string, storage_metadata map[string]string) *TaskBolt { db, _ := bolt.Open(path, 0600, nil) //Check to make sure all the required buckets have been created db.Update(func(tx *bolt.Tx) error { - if (tx.Bucket(TASK_BUCKET) == nil) { + if tx.Bucket(TASK_BUCKET) == nil { tx.CreateBucket(TASK_BUCKET) } - if (tx.Bucket(JOBS_QUEUED) == nil) { + if tx.Bucket(JOBS_QUEUED) == nil { tx.CreateBucket(JOBS_QUEUED) } - if (tx.Bucket(JOBS_ACTIVE) == nil) { + if tx.Bucket(JOBS_ACTIVE) == nil { tx.CreateBucket(JOBS_ACTIVE) } - if (tx.Bucket(JOBS_COMPLETE) == nil) { + if tx.Bucket(JOBS_COMPLETE) == nil { tx.CreateBucket(JOBS_COMPLETE) } - if (tx.Bucket(JOBS_LOG) == nil) { + if tx.Bucket(JOBS_LOG) == nil { tx.CreateBucket(JOBS_LOG) } return nil }) - return &TaskBolt{db:db, storage_metadata:storage_metadata} + return &TaskBolt{db: db, storage_metadata: storage_metadata} } - // / Run a task func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) (*ga4gh_task_exec.JobId, error) { log.Println("Recieving Task for Queue", task) @@ -60,7 +56,7 @@ func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) ( taskopId, _ := uuid.NewV4() task.TaskId = taskopId.String() - if (len(task.Docker) == 0) { + if len(task.Docker) == 0 { return nil, fmt.Errorf("No docker commands found") } @@ -82,21 +78,20 @@ func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) ( taskop_b := tx.Bucket(TASK_BUCKET) v, _ := proto.Marshal(task) - taskop_b.Put( []byte(taskopId.String()), v ) + taskop_b.Put([]byte(taskopId.String()), v) queue_b := tx.Bucket(JOBS_QUEUED) queue_b.Put([]byte(taskopId.String()), []byte(ga4gh_task_exec.State_Queued.String())) - ch <- &ga4gh_task_exec.JobId{Value:taskopId.String()} + ch <- &ga4gh_task_exec.JobId{Value: taskopId.String()} return nil }) if err != nil { return nil, err } - a := <- ch + a := <-ch return a, err } - func (self *TaskBolt) getTaskJob(task *ga4gh_task_exec.Task) (*ga4gh_task_exec.Job, error) { ch := make(chan *ga4gh_task_exec.Job, 1) self.db.View(func(tx *bolt.Tx) error { @@ -137,7 +132,7 @@ func (self *TaskBolt) getTaskJob(task *ga4gh_task_exec.Task) (*ga4gh_task_exec.J ch <- &job return nil }) - a := <- ch + a := <-ch return a, nil } @@ -153,8 +148,8 @@ func (self *TaskBolt) GetJob(ctx context.Context, job *ga4gh_task_exec.JobId) (* ch <- &out return nil }) - a := <- ch - if (a == nil) { + a := <-ch + if a == nil { return nil, fmt.Errorf("Job Not Found") } b, err := self.getTaskJob(a) @@ -207,12 +202,11 @@ func (self *TaskBolt) CancelJob(ctx context.Context, taskop *ga4gh_task_exec.Job return taskop, nil } - func (self *TaskBolt) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.JobRequest) (*ga4gh_task_ref.JobResponse, error) { //log.Printf("Job Request") ch := make(chan *ga4gh_task_exec.Task, 1) - self.db.Update(func (tx *bolt.Tx) error { + self.db.Update(func(tx *bolt.Tx) error { b_q := tx.Bucket(JOBS_QUEUED) b_a := tx.Bucket(JOBS_ACTIVE) b_op := tx.Bucket(TASK_BUCKET) @@ -232,17 +226,17 @@ func (self *TaskBolt) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.J ch <- nil return nil }) - a := <- ch - if (a == nil) { + a := <-ch + if a == nil { return &ga4gh_task_ref.JobResponse{}, nil } job := &ga4gh_task_exec.Job{ - JobId:a.TaskId, - Task:a, + JobId: a.TaskId, + Task: a, } - return &ga4gh_task_ref.JobResponse{Job:job}, nil + return &ga4gh_task_ref.JobResponse{Job: job}, nil } func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.UpdateStatusRequest) (*ga4gh_task_exec.JobId, error) { @@ -253,9 +247,9 @@ func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref. b_l := tx.Bucket(JOBS_LOG) if stat.Log != nil { - log.Printf("Logging stdout:%s", stat.Log.Stdout ) + log.Printf("Logging stdout:%s", stat.Log.Stdout) d, _ := proto.Marshal(stat.Log) - b_l.Put([]byte(fmt.Sprint("%s-%d", stat.Id, stat.Step)), d ) + b_l.Put([]byte(fmt.Sprint("%s-%d", stat.Id, stat.Step)), d) } switch stat.State { @@ -265,7 +259,7 @@ func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref. } return nil }) - return &ga4gh_task_exec.JobId{Value:stat.Id}, nil + return &ga4gh_task_exec.JobId{Value: stat.Id}, nil } func (self *TaskBolt) WorkerPing(ctx context.Context, info *ga4gh_task_ref.WorkerInfo) (*ga4gh_task_ref.WorkerInfo, error) { @@ -273,9 +267,8 @@ func (self *TaskBolt) WorkerPing(ctx context.Context, info *ga4gh_task_ref.Worke return info, nil } - func (self *TaskBolt) GetServiceInfo(ctx context.Context, info *ga4gh_task_exec.ServiceInfoRequest) (*ga4gh_task_exec.ServiceInfo, error) { - return &ga4gh_task_exec.ServiceInfo{StorageConfig:self.storage_metadata}, nil + return &ga4gh_task_exec.ServiceInfo{StorageConfig: self.storage_metadata}, nil } func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest, server ga4gh_task_ref.Scheduler_GetQueueInfoServer) error { @@ -287,7 +280,7 @@ func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest c := bq.Cursor() var count int32 = 0 for k, v := c.First(); k != nil && count < request.MaxTasks; k, v = c.Next() { - if (string(v) == ga4gh_task_exec.State_Queued.String()) { + if string(v) == ga4gh_task_exec.State_Queued.String() { v := bt.Get(k) out := ga4gh_task_exec.Task{} proto.Unmarshal(v, &out) @@ -303,7 +296,7 @@ func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest for _, i := range m.Inputs { inputs = append(inputs, i.Location) } - server.Send(&ga4gh_task_ref.QueuedTaskInfo{Inputs:inputs, Resources:m.Resources}) + server.Send(&ga4gh_task_ref.QueuedTaskInfo{Inputs: inputs, Resources: m.Resources}) } return nil -} \ No newline at end of file +} diff --git a/src/ga4gh-server/task_server.go b/src/tes/server/task_server.go similarity index 84% rename from src/ga4gh-server/task_server.go rename to src/tes/server/task_server.go index c471e3c..c484afe 100644 --- a/src/ga4gh-server/task_server.go +++ b/src/tes/server/task_server.go @@ -1,23 +1,22 @@ - -package ga4gh_task +package tes_server import ( + "tes/server/proto" + "tes/ga4gh" + "google.golang.org/grpc" "log" "net" - "google.golang.org/grpc" - "ga4gh-tasks" - "ga4gh-server/proto" ) /// Common GA4GH server, multiple services could be placed into the same server /// For the moment there is just the task server type GA4GHServer struct { - task ga4gh_task_exec.TaskServiceServer + task ga4gh_task_exec.TaskServiceServer sched ga4gh_task_ref.SchedulerServer } func NewGA4GHServer() *GA4GHServer { - return &GA4GHServer {} + return &GA4GHServer{} } func (self *GA4GHServer) RegisterTaskServer(task ga4gh_task_exec.TaskServiceServer) { @@ -28,10 +27,8 @@ func (self *GA4GHServer) RegisterScheduleServer(sched ga4gh_task_ref.SchedulerSe self.sched = sched } - - func (self *GA4GHServer) Start(host_port string) { - lis, err := net.Listen("tcp", ":" + host_port) + lis, err := net.Listen("tcp", ":"+host_port) if err != nil { panic("Cannot open port") } diff --git a/src/ga4gh-engine/worker/container.go b/src/tes/worker/container.go similarity index 65% rename from src/ga4gh-engine/worker/container.go rename to src/tes/worker/container.go index 0c1d58e..393fa32 100644 --- a/src/ga4gh-engine/worker/container.go +++ b/src/tes/worker/container.go @@ -1,16 +1,14 @@ - -package ga4gh_taskengine_worker +package tes_taskengine_worker import ( - "os" + "github.com/fsouza/go-dockerclient" "log" + "os" "strings" - "github.com/fsouza/go-dockerclient" ) - type ContainerManager interface { - Run(container string, args []string, binds[] string, remove bool, stdout_path *string, stderr_path *string) error + Run(container string, args []string, binds []string, workdir string, remove bool, stdout_path *string, stderr_path *string) error } type DockerDirect struct { @@ -23,16 +21,20 @@ func NewDockerDirect() *DockerDirect { log.Printf("Docker Error\n") return nil } - return &DockerDirect{ client:client } + return &DockerDirect{client: client} } -func (self *DockerDirect) Run(containerName string, args []string, binds[] string, remove bool, stdout *os.File, stderr *os.File) (int, error) { +func (self *DockerDirect) Run(containerName string, args []string, binds []string, workdir string, remove bool, stdout *os.File, stderr *os.File) (int, error) { create_config := docker.Config{ - Image:containerName, - Cmd:args, - AttachStderr:true, - AttachStdout:true, + Image: containerName, + Cmd: args, + AttachStderr: true, + AttachStdout: true, + } + + if len(workdir) > 0 { + create_config.WorkingDir = workdir } if _, ok := self.client.InspectImage(containerName); ok != nil { @@ -43,8 +45,8 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin if len(tmp) > 1 { tag = tmp[1] } - pull_opt := docker.PullImageOptions{ Repository: rep, Tag: tag } - if ok := self.client.PullImage( pull_opt, docker.AuthConfiguration{} ); ok != nil { + pull_opt := docker.PullImageOptions{Repository: rep, Tag: tag} + if ok := self.client.PullImage(pull_opt, docker.AuthConfiguration{}); ok != nil { log.Printf("Image not pulled: %s", ok) return -1, ok } @@ -60,7 +62,7 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin } log.Printf("Starting Docker (mount: %s): %s", strings.Join(binds, ","), strings.Join(args, " ")) - err = self.client.StartContainer(container.ID, &docker.HostConfig { + err = self.client.StartContainer(container.ID, &docker.HostConfig{ Binds: binds, }) @@ -71,8 +73,11 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin log.Printf("Attaching Container: %s", container.ID) exit_code, err := self.client.WaitContainer(container.ID) + if err != nil { + log.Printf("Docker run Error: %s", err) + } - logOpts := docker.LogsOptions{Container:container.ID, Stdout:false, Stderr:false} + logOpts := docker.LogsOptions{Container: container.ID, Stdout: false, Stderr: false} if stdout != nil { logOpts.Stdout = true @@ -89,6 +94,6 @@ func (self *DockerDirect) Run(containerName string, args []string, binds[] strin } else { log.Printf("docker %s complete", container.ID, err) } - self.client.RemoveContainer(docker.RemoveContainerOptions{ID:container.ID,RemoveVolumes:true}) + self.client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, RemoveVolumes: true}) return exit_code, nil -} \ No newline at end of file +} diff --git a/src/tes/worker/container_engine.go b/src/tes/worker/container_engine.go new file mode 100644 index 0000000..72d2d08 --- /dev/null +++ b/src/tes/worker/container_engine.go @@ -0,0 +1,108 @@ +package tes_taskengine_worker + +import ( + "github.com/docker/engine-api/client" + "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/network" + "golang.org/x/net/context" + "io" + "log" + "os" + "strings" +) + +type DockerEngine struct { + client *client.Client +} + +func NewDockerEngine() *DockerEngine { + client, err := client.NewEnvClient() + if err != nil { + log.Printf("Docker Error\n") + return nil + } + return &DockerEngine{client: client} +} + +func (self *DockerEngine) Run(containerName string, args []string, + binds []string, workdir string, remove bool, stdout *os.File, stderr *os.File) (int, error) { + + list, err := self.client.ImageList(context.Background(), types.ImageListOptions{MatchName: containerName}) + + if err != nil || len(list) == 0 { + log.Printf("Image %s not found: %s", containerName, err) + pull_opt := types.ImagePullOptions{} + r, err := self.client.ImagePull(context.Background(), containerName, pull_opt) + if err != nil { + log.Printf("Image not pulled: %s", err) + return -1, err + } + for { + l := make([]byte, 1000) + _, e := r.Read(l) + if e == io.EOF { + break + } + log.Printf("%s", l) + } + r.Close() + log.Printf("Image Pulled") + } + + container, err := self.client.ContainerCreate(context.Background(), + &container.Config{Cmd: args, Image: containerName, Tty: true}, + &container.HostConfig{Binds: binds}, + &network.NetworkingConfig{}, + "", + ) + + if err != nil { + log.Printf("Docker run Error: %s", err) + return 0, err + } + + log.Printf("Starting Docker (mount: %s): %s", strings.Join(binds, ","), strings.Join(args, " ")) + err = self.client.ContainerStart(context.Background(), container.ID, types.ContainerStartOptions{}) + + if err != nil { + log.Printf("Docker run Error: %s", err) + return 0, err + } + + log.Printf("Attaching Container: %s", container.ID) + exit_code, err := self.client.ContainerWait(context.Background(), container.ID) + if err != nil { + log.Printf("docker %s error: %s", container.ID, err) + } else { + log.Printf("docker %s complete", container.ID) + } + + if stdout != nil { + stdout_log, _ := self.client.ContainerLogs(context.Background(), container.ID, types.ContainerLogsOptions{ShowStdout: true, Details: false}) + buffer := make([]byte, 10240) + for { + l, e := stdout_log.Read(buffer) + if e == io.EOF { + break + } + stdout.Write(buffer[:l]) + } + stdout_log.Close() + } + + if stderr != nil { + stderr_log, _ := self.client.ContainerLogs(context.Background(), container.ID, types.ContainerLogsOptions{ShowStderr: true}) + buffer := make([]byte, 10240) + for { + l, e := stderr_log.Read(buffer) + if e == io.EOF { + break + } + stderr.Write(buffer[:l]) + } + stderr_log.Close() + } + self.client.ContainerRemove(context.Background(), container.ID, types.ContainerRemoveOptions{RemoveVolumes: true}) + return exit_code, nil +} diff --git a/src/ga4gh-engine/worker/engine.go b/src/tes/worker/engine.go similarity index 90% rename from src/ga4gh-engine/worker/engine.go rename to src/tes/worker/engine.go index 3261cdb..0d4caa2 100644 --- a/src/ga4gh-engine/worker/engine.go +++ b/src/tes/worker/engine.go @@ -1,13 +1,13 @@ -package ga4gh_taskengine_worker +package tes_taskengine_worker import ( - "os" "fmt" - "ga4gh-tasks" + "tes/ga4gh" + "os" ) - const HEADER_SIZE = int64(102400) + func read_file_head(path string) []byte { f, _ := os.Open(path) buffer := make([]byte, HEADER_SIZE) @@ -16,7 +16,6 @@ func read_file_head(path string) []byte { return buffer[:l] } - func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error { mapper.Job(job.JobId) @@ -26,14 +25,14 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error { } for _, input := range job.Task.Inputs { - err := mapper.MapInput(job.JobId, input.Location, input.Path, input.Directory) + err := mapper.MapInput(job.JobId, input.Location, input.Path, input.Class) if err != nil { return err } } - for _, output := range(job.Task.Outputs) { - err := mapper.MapOutput(job.JobId, output.Location, output.Path, output.Directory) + for _, output := range job.Task.Outputs { + err := mapper.MapOutput(job.JobId, output.Location, output.Path, output.Class, output.Create) if err != nil { return err } @@ -57,8 +56,8 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error { } binds := mapper.GetBindings(job.JobId) - dclient := NewDockerDirect() - exit_code, err := dclient.Run(dockerTask.ImageName, dockerTask.Cmd, binds, true, stdout, stderr) + dclient := NewDockerEngine() + exit_code, err := dclient.Run(dockerTask.ImageName, dockerTask.Cmd, binds, dockerTask.Workdir, true, stdout, stderr) stdout.Close() stderr.Close() @@ -89,4 +88,4 @@ func RunJob(job *ga4gh_task_exec.Job, mapper FileMapper) error { mapper.FinalizeJob(job.JobId) return nil -} \ No newline at end of file +} diff --git a/src/tes/worker/file_client.go b/src/tes/worker/file_client.go new file mode 100644 index 0000000..63a96b5 --- /dev/null +++ b/src/tes/worker/file_client.go @@ -0,0 +1,142 @@ +package tes_taskengine_worker + +import ( + "fmt" + "tes/server/proto" + "tes/ga4gh" + "golang.org/x/net/context" + "io/ioutil" + "os" + "path" +) + +type FileMapper struct { + fileSystem FileSystemAccess + VolumeDir string + client *ga4gh_task_ref.SchedulerClient + jobs map[string]*JobFileMapper +} + +type JobFileMapper struct { + JobId string + WorkDir string + Bindings []FSBinding + Outputs []ga4gh_task_exec.TaskParameter +} + +type FileSystemAccess interface { + Get(storage string, path string) error + Put(storage string, path string, class string) error +} + +type EngineStatus struct { + JobCount int32 + ActiveJobs int32 +} + +type FSBinding struct { + HostPath string + ContainerPath string + Mode string +} + +func NewFileMapper(client *ga4gh_task_ref.SchedulerClient, fileSystem FileSystemAccess, volumeDir string) *FileMapper { + if _, err := os.Stat(volumeDir); os.IsNotExist(err) { + os.Mkdir(volumeDir, 0700) + } + return &FileMapper{VolumeDir: volumeDir, jobs: make(map[string]*JobFileMapper), client: client, fileSystem: fileSystem} +} + +func (self *FileMapper) Job(jobId string) { + //create a working 'disk' for runtime files + w := path.Join(self.VolumeDir, jobId) + if _, err := os.Stat(w); err != nil { + os.Mkdir(w, 0700) + } + a := JobFileMapper{JobId: jobId, WorkDir: w} + self.jobs[jobId] = &a +} + +func (self *FileMapper) AddVolume(jobId string, source string, mount string) { + tmpPath, _ := ioutil.TempDir(self.VolumeDir, fmt.Sprintf("job_%s", jobId)) + b := FSBinding{ + HostPath: tmpPath, + ContainerPath: mount, + Mode: "rw", + } + j := self.jobs[jobId] + j.Bindings = append(j.Bindings, b) +} + +func (self *FileMapper) HostPath(jobId string, mountPath string) string { + for _, vol := range self.jobs[jobId].Bindings { + base, relpath := pathMatch(vol.ContainerPath, mountPath) + if len(base) > 0 { + return path.Join(vol.HostPath, relpath) + } + } + return "" +} + +func (self *FileMapper) MapInput(jobId string, storage string, mountPath string, class string) error { + for _, vol := range self.jobs[jobId].Bindings { + base, relpath := pathMatch(vol.ContainerPath, mountPath) + if len(base) > 0 { + dstPath := path.Join(vol.HostPath, relpath) + fmt.Printf("get %s %s\n", storage, dstPath) + err := self.fileSystem.Get(storage, dstPath) + if err != nil { + return err + } + } + } + return nil +} + +func (self *FileMapper) MapOutput(jobId string, storage string, mountPath string, class string, create bool) error { + a := ga4gh_task_exec.TaskParameter{Location: storage, Path: mountPath, Create: create, Class: class} + j := self.jobs[jobId] + if create { + for _, vol := range self.jobs[jobId].Bindings { + base, relpath := pathMatch(vol.ContainerPath, mountPath) + if len(base) > 0 { + if class == "Directory" { + os.MkdirAll(path.Join(vol.HostPath, relpath), 0777) + } else if class == "File" { + ioutil.WriteFile(path.Join(vol.HostPath, relpath), []byte{}, 0777) + } else { + return fmt.Errorf("Unknown class type: %s", class) + } + } + } + } + j.Outputs = append(j.Outputs, a) + return nil +} + +func (self *FileMapper) GetBindings(jobId string) []string { + out := make([]string, 0, 10) + for _, c := range self.jobs[jobId].Bindings { + o := fmt.Sprintf("%s:%s:%s", c.HostPath, c.ContainerPath, c.Mode) + out = append(out, o) + } + return out +} + +func (self *FileMapper) UpdateOutputs(jobId string, jobNum int, exitCode int, stdoutText string, stderrText string) { + log := ga4gh_task_exec.JobLog{Stdout: stdoutText, Stderr: stderrText, ExitCode: int32(exitCode)} + a := ga4gh_task_ref.UpdateStatusRequest{Id: jobId, Step: int64(jobNum), Log: &log} + (*self.client).UpdateJobStatus(context.Background(), &a) +} + +func (self *FileMapper) TempFile(jobId string) (f *os.File, err error) { + out, err := ioutil.TempFile(self.jobs[jobId].WorkDir, "ga4ghtask_") + return out, err +} + +func (self *FileMapper) FinalizeJob(jobId string) { + for _, out := range self.jobs[jobId].Outputs { + hst := self.HostPath(jobId, out.Path) + self.fileSystem.Put(out.Location, hst, out.Class) + } +} diff --git a/src/tes/worker/fork_engine.go b/src/tes/worker/fork_engine.go new file mode 100644 index 0000000..7a3c9be --- /dev/null +++ b/src/tes/worker/fork_engine.go @@ -0,0 +1,101 @@ +package tes_taskengine_worker + +import ( + "tes/server/proto" + "tes/ga4gh" + context "golang.org/x/net/context" + "log" + "os" + "sync/atomic" + "time" + //proto "github.com/golang/protobuf/proto" +) + +type ForkManager struct { + procCount int + running bool + files FileMapper + sched ga4gh_task_ref.SchedulerClient + workerId string + ctx context.Context + check_func func(status EngineStatus) + status EngineStatus +} + +func (self *ForkManager) worker(inchan chan ga4gh_task_exec.Job) { + for job := range inchan { + atomic.AddInt32(&self.status.ActiveJobs, 1) + atomic.AddInt32(&self.status.JobCount, 1) + log.Printf("Launch job: %s", job) + s := ga4gh_task_exec.State_Running + self.sched.UpdateJobStatus(self.ctx, &ga4gh_task_ref.UpdateStatusRequest{Id: job.JobId, State: s}) + err := RunJob(&job, self.files) + if err != nil { + log.Printf("Job error: %s", err) + self.sched.UpdateJobStatus(self.ctx, &ga4gh_task_ref.UpdateStatusRequest{Id: job.JobId, State: ga4gh_task_exec.State_Error}) + } else { + self.sched.UpdateJobStatus(self.ctx, &ga4gh_task_ref.UpdateStatusRequest{Id: job.JobId, State: ga4gh_task_exec.State_Complete}) + } + atomic.AddInt32(&self.status.ActiveJobs, -1) + } +} + +func (self *ForkManager) watcher(sched ga4gh_task_ref.SchedulerClient, filestore FileMapper) { + self.sched = sched + self.files = filestore + hostname, _ := os.Hostname() + jobchan := make(chan ga4gh_task_exec.Job, 10) + for i := 0; i < self.procCount; i++ { + go self.worker(jobchan) + } + var sleep_size int64 = 1 + for self.running { + if self.check_func != nil { + self.check_func(self.status) + } + task, err := self.sched.GetJobToRun(self.ctx, + &ga4gh_task_ref.JobRequest{ + Worker: &ga4gh_task_ref.WorkerInfo{ + Id: self.workerId, + Hostname: hostname, + LastPing: time.Now().Unix(), + }, + }) + if err != nil { + log.Print(err) + } + if task != nil && task.Job != nil { + sleep_size = 1 + log.Printf("Found job: %s", task) + jobchan <- *task.Job + } else { + //log.Printf("No jobs found") + if sleep_size < 20 { + // sleep_size += 1 + } + time.Sleep(time.Second * time.Duration(sleep_size)) + } + } + close(jobchan) +} + +func (self *ForkManager) Start(engine ga4gh_task_ref.SchedulerClient, files FileMapper) { + go self.watcher(engine, files) +} + +func (self *ForkManager) Run(engine ga4gh_task_ref.SchedulerClient, files FileMapper) { + self.watcher(engine, files) +} + +func (self *ForkManager) SetStatusCheck(check_func func(status EngineStatus)) { + self.check_func = check_func +} + +func NewLocalManager(procCount int, workerId string) (*ForkManager, error) { + return &ForkManager{ + procCount: procCount, + running: true, + workerId: workerId, + ctx: context.Background(), + }, nil +} diff --git a/src/tes/worker/fs_client.go b/src/tes/worker/fs_client.go new file mode 100644 index 0000000..d55e708 --- /dev/null +++ b/src/tes/worker/fs_client.go @@ -0,0 +1,51 @@ +package tes_taskengine_worker + +import ( + "fmt" + "log" + "os" + "path" + "strings" +) + +type FileStorageAccess struct { + StorageDir string +} + +func NewSharedFS(base string) *FileStorageAccess { + return &FileStorageAccess{StorageDir: base} +} + +func (self *FileStorageAccess) Get(storage string, hostPath string) error { + storage = strings.TrimPrefix(storage, "fs://") + srcPath := path.Join(self.StorageDir, storage) + if _, err := os.Stat(srcPath); os.IsNotExist(err) { + return fmt.Errorf("storage file '%s' not found", srcPath) + } + copyFileContents(srcPath, hostPath) + return nil +} + +func (self *FileStorageAccess) Put(location string, hostPath string, class string) error { + + storage := strings.TrimPrefix(location, "fs://") + + log.Printf("copy out %s %s\n", hostPath, path.Join(self.StorageDir, storage)) + //copy to storage directory + if class == "Directory" { + err := CopyDir(hostPath, path.Join(self.StorageDir, storage)) + if err != nil { + log.Printf("Error copying output directory %s to %s", hostPath, location) + return err + } + } else if class == "File" { + err := CopyFile(hostPath, path.Join(self.StorageDir, storage)) + if err != nil { + log.Printf("Error copying output file %s to %s", hostPath, location) + return err + } + } else { + return fmt.Errorf("Unknown Class type: %s", class) + } + return nil +} diff --git a/src/tes/worker/swift_client.go b/src/tes/worker/swift_client.go new file mode 100644 index 0000000..bdc1060 --- /dev/null +++ b/src/tes/worker/swift_client.go @@ -0,0 +1,92 @@ +package tes_taskengine_worker + +import ( + "fmt" + "github.com/rackspace/gophercloud" + "github.com/rackspace/gophercloud/openstack" + "io" + "log" + "os" + "strings" + //"github.com/rackspace/gophercloud/openstack/objectstorage/v1/containers" + "github.com/rackspace/gophercloud/openstack/objectstorage/v1/objects" + //"github.com/rackspace/gophercloud/pagination" +) + +var SWIFT_PROTOCOL = "swift://" + +type SwiftAccess struct { + client *gophercloud.ServiceClient +} + +func NewSwiftAccess() *SwiftAccess { + + opts, err := openstack.AuthOptionsFromEnv() + provider, err := openstack.AuthenticatedClient(opts) + + if err != nil { + panic("Authentication Error") + } + + swift_client, err := openstack.NewObjectStorageV1(provider, gophercloud.EndpointOpts{}) + if err != nil { + panic("Storage Connection Error") + } + + return &SwiftAccess{client: swift_client} + +} + +func (self *SwiftAccess) Get(storage string, hostPath string) error { + log.Printf("Starting download of %s", storage) + storage = strings.TrimPrefix(storage, SWIFT_PROTOCOL) + storage_split := strings.SplitN(storage, "/", 2) + + // Download everything into a DownloadResult struct + opts := objects.DownloadOpts{} + res := objects.Download(self.client, storage_split[0], storage_split[1], opts) + + file, err := os.Create(hostPath) + if err != nil { + return err + } + buffer := make([]byte, 10240) + total_len := 0 + for { + len, err := res.Body.Read(buffer) + if err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("Error reading file") + } + total_len += len + file.Write(buffer[:len]) + } + file.Close() + res.Body.Close() + log.Printf("Downloaded %d bytes", total_len) + return nil +} + +func (self *SwiftAccess) Put(storage string, hostPath string, class string) error { + log.Printf("Starting upload of %s", storage) + content, err := os.Open(hostPath) + if err != nil { + return err + } + + storage = strings.TrimPrefix(storage, SWIFT_PROTOCOL) + storage_split := strings.SplitN(storage, "/", 2) + + if class == "File" { + // Now execute the upload + opts := objects.CreateOpts{} + res := objects.Create(self.client, storage_split[0], storage_split[1], content, opts) + _, err = res.ExtractHeader() + content.Close() + return err + } else if class == "Directory" { + return fmt.Errorf("SWIFT directories not yet supported") + } + return fmt.Errorf("Unknown element type: %s", class) +} diff --git a/src/ga4gh-engine/worker/util.go b/src/tes/worker/util.go similarity index 52% rename from src/ga4gh-engine/worker/util.go rename to src/tes/worker/util.go index 47c0f91..0c87370 100644 --- a/src/ga4gh-engine/worker/util.go +++ b/src/tes/worker/util.go @@ -1,14 +1,23 @@ - -package ga4gh_taskengine_worker +package tes_taskengine_worker import ( - "os" - "io" "fmt" + "io" + "os" "path" ) - +func pathMatch(base string, query string) (string, string) { + if path.Clean(base) == path.Clean(query) { + return query, "" + } + dir, file := path.Split(query) + if len(dir) > 1 { + d, p := pathMatch(base, dir) + return d, path.Join(p, file) + } + return "", "" +} func copyFileContents(src, dst string) (err error) { in, err := os.Open(src) @@ -66,3 +75,39 @@ func CopyFile(src, dst string) (err error) { err = copyFileContents(src, dst) return } + +func CopyDir(source string, dest string) (err error) { + // get properties of source dir + sourceinfo, err := os.Stat(source) + if err != nil { + return err + } + + // create dest dir + err = os.MkdirAll(dest, sourceinfo.Mode()) + if err != nil { + return err + } + + directory, _ := os.Open(source) + objects, err := directory.Readdir(-1) + for _, obj := range objects { + sourcefilepointer := source + "/" + obj.Name() + destinationfilepointer := dest + "/" + obj.Name() + if obj.IsDir() { + // create sub-directories - recursively + err = CopyDir(sourcefilepointer, destinationfilepointer) + if err != nil { + fmt.Println(err) + } + } else { + // perform copy + err = CopyFile(sourcefilepointer, destinationfilepointer) + if err != nil { + fmt.Println(err) + } + } + + } + return +} diff --git a/task-execution-schemas b/task-execution-schemas index 6d05daf..afc5cc9 160000 --- a/task-execution-schemas +++ b/task-execution-schemas @@ -1 +1 @@ -Subproject commit 6d05daf03b3982a7a8cfa7d21aec6e1e4594be90 +Subproject commit afc5cc9fc6a81ef46cba8a491cfa4276f97c4f24 diff --git a/tests/test_fileop.py b/tests/test_fileop.py index 49431d1..1284721 100644 --- a/tests/test_fileop.py +++ b/tests/test_fileop.py @@ -24,12 +24,14 @@ def test_file_mount(self): "name" : "infile", "description" : "File to be MD5ed", "location" : in_loc, + "class" : "File", "path" : "/tmp/test_file" } ], "outputs" : [ { "location" : out_loc, + "class" : "File", "path" : "/tmp/test_out" } ],