From 8335bfc18cc8d4b63551c1b3871fbb2b77192106 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Thu, 25 Aug 2016 14:13:56 -0700 Subject: [PATCH] Adding deployment script and swift advertisment --- deploy-ubuntu.sh | 10 +++++ src/ga4gh-server/task_boltdb.go | 69 ++++++++++++++---------------- src/ga4gh-server/task_interface.go | 8 ++-- src/ga4gh-server/task_server.go | 15 +++---- src/ga4gh-taskserver/server.go | 51 ++++++++++++---------- 5 files changed, 78 insertions(+), 75 deletions(-) create mode 100644 deploy-ubuntu.sh diff --git a/deploy-ubuntu.sh b/deploy-ubuntu.sh new file mode 100644 index 0000000..751d786 --- /dev/null +++ b/deploy-ubuntu.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +sudo apt-get update +sudo apt install -y golang-go + +git clone https://github.com/bmeg/task-execution-server.git +git checkout scaling + +make depends +make diff --git a/src/ga4gh-server/task_boltdb.go b/src/ga4gh-server/task_boltdb.go index 417662c..9eca6ea 100644 --- a/src/ga4gh-server/task_boltdb.go +++ b/src/ga4gh-server/task_boltdb.go @@ -1,19 +1,17 @@ - package ga4gh_task import ( - "golang.org/x/net/context" + "fmt" + "ga4gh-server/proto" "ga4gh-tasks" "github.com/boltdb/bolt" - uuid "github.com/nu7hatch/gouuid" proto "github.com/golang/protobuf/proto" - "fmt" + uuid "github.com/nu7hatch/gouuid" + "golang.org/x/net/context" "log" "strings" - "ga4gh-server/proto" ) - var TASK_BUCKET = []byte("tasks") var JOBS_QUEUED = []byte("jobs-queued") @@ -23,36 +21,34 @@ var JOBS_COMPLETE = []byte("jobs-complete") var JOBS_LOG = []byte("jobs-log") type TaskBolt struct { - db *bolt.DB + db *bolt.DB storage_metadata map[string]string } - func NewTaskBolt(path string, storage_metadata map[string]string) *TaskBolt { db, _ := bolt.Open(path, 0600, nil) //Check to make sure all the required buckets have been created db.Update(func(tx *bolt.Tx) error { - if (tx.Bucket(TASK_BUCKET) == nil) { + if tx.Bucket(TASK_BUCKET) == nil { tx.CreateBucket(TASK_BUCKET) } - if (tx.Bucket(JOBS_QUEUED) == nil) { + if tx.Bucket(JOBS_QUEUED) == nil { tx.CreateBucket(JOBS_QUEUED) } - if (tx.Bucket(JOBS_ACTIVE) == nil) { + if tx.Bucket(JOBS_ACTIVE) == nil { tx.CreateBucket(JOBS_ACTIVE) } - if (tx.Bucket(JOBS_COMPLETE) == nil) { + if tx.Bucket(JOBS_COMPLETE) == nil { tx.CreateBucket(JOBS_COMPLETE) } - if (tx.Bucket(JOBS_LOG) == nil) { + if tx.Bucket(JOBS_LOG) == nil { tx.CreateBucket(JOBS_LOG) } return nil }) - return &TaskBolt{db:db, storage_metadata:storage_metadata} + return &TaskBolt{db: db, storage_metadata: storage_metadata} } - // / Run a task func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) (*ga4gh_task_exec.JobId, error) { log.Println("Recieving Task for Queue", task) @@ -60,7 +56,7 @@ func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) ( taskopId, _ := uuid.NewV4() task.TaskId = taskopId.String() - if (len(task.Docker) == 0) { + if len(task.Docker) == 0 { return nil, fmt.Errorf("No docker commands found") } @@ -82,21 +78,20 @@ func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) ( taskop_b := tx.Bucket(TASK_BUCKET) v, _ := proto.Marshal(task) - taskop_b.Put( []byte(taskopId.String()), v ) + taskop_b.Put([]byte(taskopId.String()), v) queue_b := tx.Bucket(JOBS_QUEUED) queue_b.Put([]byte(taskopId.String()), []byte(ga4gh_task_exec.State_Queued.String())) - ch <- &ga4gh_task_exec.JobId{Value:taskopId.String()} + ch <- &ga4gh_task_exec.JobId{Value: taskopId.String()} return nil }) if err != nil { return nil, err } - a := <- ch + a := <-ch return a, err } - func (self *TaskBolt) getTaskJob(task *ga4gh_task_exec.Task) (*ga4gh_task_exec.Job, error) { ch := make(chan *ga4gh_task_exec.Job, 1) self.db.View(func(tx *bolt.Tx) error { @@ -137,7 +132,7 @@ func (self *TaskBolt) getTaskJob(task *ga4gh_task_exec.Task) (*ga4gh_task_exec.J ch <- &job return nil }) - a := <- ch + a := <-ch return a, nil } @@ -153,8 +148,8 @@ func (self *TaskBolt) GetJob(ctx context.Context, job *ga4gh_task_exec.JobId) (* ch <- &out return nil }) - a := <- ch - if (a == nil) { + a := <-ch + if a == nil { return nil, fmt.Errorf("Job Not Found") } b, err := self.getTaskJob(a) @@ -207,12 +202,11 @@ func (self *TaskBolt) CancelJob(ctx context.Context, taskop *ga4gh_task_exec.Job return taskop, nil } - func (self *TaskBolt) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.JobRequest) (*ga4gh_task_ref.JobResponse, error) { //log.Printf("Job Request") ch := make(chan *ga4gh_task_exec.Task, 1) - self.db.Update(func (tx *bolt.Tx) error { + self.db.Update(func(tx *bolt.Tx) error { b_q := tx.Bucket(JOBS_QUEUED) b_a := tx.Bucket(JOBS_ACTIVE) b_op := tx.Bucket(TASK_BUCKET) @@ -232,17 +226,17 @@ func (self *TaskBolt) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.J ch <- nil return nil }) - a := <- ch - if (a == nil) { + a := <-ch + if a == nil { return &ga4gh_task_ref.JobResponse{}, nil } job := &ga4gh_task_exec.Job{ - JobId:a.TaskId, - Task:a, + JobId: a.TaskId, + Task: a, } - return &ga4gh_task_ref.JobResponse{Job:job}, nil + return &ga4gh_task_ref.JobResponse{Job: job}, nil } func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.UpdateStatusRequest) (*ga4gh_task_exec.JobId, error) { @@ -253,9 +247,9 @@ func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref. b_l := tx.Bucket(JOBS_LOG) if stat.Log != nil { - log.Printf("Logging stdout:%s", stat.Log.Stdout ) + log.Printf("Logging stdout:%s", stat.Log.Stdout) d, _ := proto.Marshal(stat.Log) - b_l.Put([]byte(fmt.Sprint("%s-%d", stat.Id, stat.Step)), d ) + b_l.Put([]byte(fmt.Sprint("%s-%d", stat.Id, stat.Step)), d) } switch stat.State { @@ -265,7 +259,7 @@ func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref. } return nil }) - return &ga4gh_task_exec.JobId{Value:stat.Id}, nil + return &ga4gh_task_exec.JobId{Value: stat.Id}, nil } func (self *TaskBolt) WorkerPing(ctx context.Context, info *ga4gh_task_ref.WorkerInfo) (*ga4gh_task_ref.WorkerInfo, error) { @@ -273,9 +267,8 @@ func (self *TaskBolt) WorkerPing(ctx context.Context, info *ga4gh_task_ref.Worke return info, nil } - func (self *TaskBolt) GetServiceInfo(ctx context.Context, info *ga4gh_task_exec.ServiceInfoRequest) (*ga4gh_task_exec.ServiceInfo, error) { - return &ga4gh_task_exec.ServiceInfo{StorageConfig:self.storage_metadata}, nil + return &ga4gh_task_exec.ServiceInfo{StorageConfig: self.storage_metadata}, nil } func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest, server ga4gh_task_ref.Scheduler_GetQueueInfoServer) error { @@ -287,7 +280,7 @@ func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest c := bq.Cursor() var count int32 = 0 for k, v := c.First(); k != nil && count < request.MaxTasks; k, v = c.Next() { - if (string(v) == ga4gh_task_exec.State_Queued.String()) { + if string(v) == ga4gh_task_exec.State_Queued.String() { v := bt.Get(k) out := ga4gh_task_exec.Task{} proto.Unmarshal(v, &out) @@ -303,7 +296,7 @@ func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest for _, i := range m.Inputs { inputs = append(inputs, i.Location) } - server.Send(&ga4gh_task_ref.QueuedTaskInfo{Inputs:inputs, Resources:m.Resources}) + server.Send(&ga4gh_task_ref.QueuedTaskInfo{Inputs: inputs, Resources: m.Resources}) } return nil -} \ No newline at end of file +} diff --git a/src/ga4gh-server/task_interface.go b/src/ga4gh-server/task_interface.go index 7b0e604..d40c4ec 100644 --- a/src/ga4gh-server/task_interface.go +++ b/src/ga4gh-server/task_interface.go @@ -1,15 +1,13 @@ - package ga4gh_task import ( - //"golang.org/x/net/context" - //"ga4gh-tasks" +//"golang.org/x/net/context" +//"ga4gh-tasks" ) type TaskImpl struct { } - func NewTaskImpl() *TaskImpl { return &TaskImpl{} } @@ -45,4 +43,4 @@ func (self *TaskImpl) CancelTaskOp(context.Context, *ga4gh_task_exec.TaskOpId) ( } -*/ \ No newline at end of file +*/ diff --git a/src/ga4gh-server/task_server.go b/src/ga4gh-server/task_server.go index c471e3c..8823d0d 100644 --- a/src/ga4gh-server/task_server.go +++ b/src/ga4gh-server/task_server.go @@ -1,23 +1,22 @@ - package ga4gh_task import ( + "ga4gh-server/proto" + "ga4gh-tasks" + "google.golang.org/grpc" "log" "net" - "google.golang.org/grpc" - "ga4gh-tasks" - "ga4gh-server/proto" ) /// Common GA4GH server, multiple services could be placed into the same server /// For the moment there is just the task server type GA4GHServer struct { - task ga4gh_task_exec.TaskServiceServer + task ga4gh_task_exec.TaskServiceServer sched ga4gh_task_ref.SchedulerServer } func NewGA4GHServer() *GA4GHServer { - return &GA4GHServer {} + return &GA4GHServer{} } func (self *GA4GHServer) RegisterTaskServer(task ga4gh_task_exec.TaskServiceServer) { @@ -28,10 +27,8 @@ func (self *GA4GHServer) RegisterScheduleServer(sched ga4gh_task_ref.SchedulerSe self.sched = sched } - - func (self *GA4GHServer) Start(host_port string) { - lis, err := net.Listen("tcp", ":" + host_port) + lis, err := net.Listen("tcp", ":"+host_port) if err != nil { panic("Cannot open port") } diff --git a/src/ga4gh-taskserver/server.go b/src/ga4gh-taskserver/server.go index d605db0..f16e17a 100644 --- a/src/ga4gh-taskserver/server.go +++ b/src/ga4gh-taskserver/server.go @@ -1,35 +1,34 @@ package main import ( - "os" - "fmt" "flag" - "net/http" - "path/filepath" - "golang.org/x/net/context" + "fmt" + "ga4gh-engine" + "ga4gh-engine/scaling" + "ga4gh-server" + "ga4gh-tasks" "github.com/gorilla/mux" - "google.golang.org/grpc" "github.com/grpc-ecosystem/grpc-gateway/runtime" - "ga4gh-tasks" - "ga4gh-server" - "runtime/debug" + "golang.org/x/net/context" + "google.golang.org/grpc" "log" - "ga4gh-engine" - "ga4gh-engine/scaling" + "net/http" + "os" + "path/filepath" + "runtime/debug" ) - - func main() { http_port := flag.String("port", "8000", "HTTP Port") rpc_port := flag.String("rpc", "9090", "HTTP Port") storage_dir_arg := flag.String("storage", "storage", "Storage Dir") + swift_arg := flag.Bool("swift", false, "Use SWIFT object store") task_db := flag.String("db", "ga4gh_tasks.db", "Task DB File") scaler_name := flag.String("scaler", "local", "Scaler") flag.Parse() - - dir, _ := filepath.Abs(os.Args[0]) + + dir, _ := filepath.Abs(os.Args[0]) content_dir := filepath.Join(dir, "..", "..", "share") config := map[string]string{} @@ -39,10 +38,16 @@ func main() { //server meta-data storage_dir, _ := filepath.Abs(*storage_dir_arg) - meta_data := map[string]string{ "storageType" : "sharedFile", "baseDir" : storage_dir } + var meta_data = make(map[string]string) + if !*swift_arg { + meta_data["storageType"] = "sharedFile" + meta_data["baseDir"] = storage_dir + } else { + meta_data["storageType"] = "swift" + } //setup GRPC listener - taski := ga4gh_task.NewTaskBolt(*task_db, meta_data) //ga4gh_task.NewTaskImpl() + taski := ga4gh_task.NewTaskBolt(*task_db, meta_data) //setup scheduler scheduler := ga4gh_taskengine.Scheduler(taski, scaler) @@ -58,9 +63,9 @@ func main() { ctx, cancel := context.WithCancel(ctx) defer cancel() opts := []grpc.DialOption{grpc.WithInsecure()} - log.Println("Proxy connecting to localhost:" + *rpc_port ) - err := ga4gh_task_exec.RegisterTaskServiceHandlerFromEndpoint(ctx, grpc_mux, "localhost:" + *rpc_port, opts) - if (err != nil) { + log.Println("Proxy connecting to localhost:" + *rpc_port) + err := ga4gh_task_exec.RegisterTaskServiceHandlerFromEndpoint(ctx, grpc_mux, "localhost:"+*rpc_port, opts) + if err != nil { fmt.Println("Register Error", err) } @@ -74,12 +79,12 @@ func main() { } // Routes consist of a path and a handler function r.HandleFunc("/", - func (w http.ResponseWriter, r *http.Request) { + func(w http.ResponseWriter, r *http.Request) { http.ServeFile(w, r, filepath.Join(content_dir, "index.html")) }) r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.Dir(content_dir)))) r.PathPrefix("/v1/").Handler(grpc_mux) log.Printf("Listening on port: %s\n", *http_port) - http.ListenAndServe(":" + *http_port, r) -} \ No newline at end of file + http.ListenAndServe(":"+*http_port, r) +}