Skip to content

Commit

Permalink
Adding deployment script and swift advertisment
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Aug 25, 2016
1 parent 01f90a7 commit 8335bfc
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 75 deletions.
10 changes: 10 additions & 0 deletions deploy-ubuntu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

sudo apt-get update
sudo apt install -y golang-go

git clone https://github.com/bmeg/task-execution-server.git
git checkout scaling

make depends
make
69 changes: 31 additions & 38 deletions src/ga4gh-server/task_boltdb.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@

package ga4gh_task

import (
"golang.org/x/net/context"
"fmt"
"ga4gh-server/proto"
"ga4gh-tasks"
"github.com/boltdb/bolt"
uuid "github.com/nu7hatch/gouuid"
proto "github.com/golang/protobuf/proto"
"fmt"
uuid "github.com/nu7hatch/gouuid"
"golang.org/x/net/context"
"log"
"strings"
"ga4gh-server/proto"
)


var TASK_BUCKET = []byte("tasks")

var JOBS_QUEUED = []byte("jobs-queued")
Expand All @@ -23,44 +21,42 @@ var JOBS_COMPLETE = []byte("jobs-complete")
var JOBS_LOG = []byte("jobs-log")

type TaskBolt struct {
db *bolt.DB
db *bolt.DB
storage_metadata map[string]string
}


func NewTaskBolt(path string, storage_metadata map[string]string) *TaskBolt {
db, _ := bolt.Open(path, 0600, nil)
//Check to make sure all the required buckets have been created
db.Update(func(tx *bolt.Tx) error {
if (tx.Bucket(TASK_BUCKET) == nil) {
if tx.Bucket(TASK_BUCKET) == nil {
tx.CreateBucket(TASK_BUCKET)
}
if (tx.Bucket(JOBS_QUEUED) == nil) {
if tx.Bucket(JOBS_QUEUED) == nil {
tx.CreateBucket(JOBS_QUEUED)
}
if (tx.Bucket(JOBS_ACTIVE) == nil) {
if tx.Bucket(JOBS_ACTIVE) == nil {
tx.CreateBucket(JOBS_ACTIVE)
}
if (tx.Bucket(JOBS_COMPLETE) == nil) {
if tx.Bucket(JOBS_COMPLETE) == nil {
tx.CreateBucket(JOBS_COMPLETE)
}
if (tx.Bucket(JOBS_LOG) == nil) {
if tx.Bucket(JOBS_LOG) == nil {
tx.CreateBucket(JOBS_LOG)
}
return nil
})
return &TaskBolt{db:db, storage_metadata:storage_metadata}
return &TaskBolt{db: db, storage_metadata: storage_metadata}
}


// / Run a task
func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) (*ga4gh_task_exec.JobId, error) {
log.Println("Recieving Task for Queue", task)

taskopId, _ := uuid.NewV4()

task.TaskId = taskopId.String()
if (len(task.Docker) == 0) {
if len(task.Docker) == 0 {
return nil, fmt.Errorf("No docker commands found")
}

Expand All @@ -82,21 +78,20 @@ func (self *TaskBolt) RunTask(ctx context.Context, task *ga4gh_task_exec.Task) (

taskop_b := tx.Bucket(TASK_BUCKET)
v, _ := proto.Marshal(task)
taskop_b.Put( []byte(taskopId.String()), v )
taskop_b.Put([]byte(taskopId.String()), v)

queue_b := tx.Bucket(JOBS_QUEUED)
queue_b.Put([]byte(taskopId.String()), []byte(ga4gh_task_exec.State_Queued.String()))
ch <- &ga4gh_task_exec.JobId{Value:taskopId.String()}
ch <- &ga4gh_task_exec.JobId{Value: taskopId.String()}
return nil
})
if err != nil {
return nil, err
}
a := <- ch
a := <-ch
return a, err
}


func (self *TaskBolt) getTaskJob(task *ga4gh_task_exec.Task) (*ga4gh_task_exec.Job, error) {
ch := make(chan *ga4gh_task_exec.Job, 1)
self.db.View(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -137,7 +132,7 @@ func (self *TaskBolt) getTaskJob(task *ga4gh_task_exec.Task) (*ga4gh_task_exec.J
ch <- &job
return nil
})
a := <- ch
a := <-ch
return a, nil
}

Expand All @@ -153,8 +148,8 @@ func (self *TaskBolt) GetJob(ctx context.Context, job *ga4gh_task_exec.JobId) (*
ch <- &out
return nil
})
a := <- ch
if (a == nil) {
a := <-ch
if a == nil {
return nil, fmt.Errorf("Job Not Found")
}
b, err := self.getTaskJob(a)
Expand Down Expand Up @@ -207,12 +202,11 @@ func (self *TaskBolt) CancelJob(ctx context.Context, taskop *ga4gh_task_exec.Job
return taskop, nil
}


func (self *TaskBolt) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.JobRequest) (*ga4gh_task_ref.JobResponse, error) {
//log.Printf("Job Request")
ch := make(chan *ga4gh_task_exec.Task, 1)

self.db.Update(func (tx *bolt.Tx) error {
self.db.Update(func(tx *bolt.Tx) error {
b_q := tx.Bucket(JOBS_QUEUED)
b_a := tx.Bucket(JOBS_ACTIVE)
b_op := tx.Bucket(TASK_BUCKET)
Expand All @@ -232,17 +226,17 @@ func (self *TaskBolt) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.J
ch <- nil
return nil
})
a := <- ch
if (a == nil) {
a := <-ch
if a == nil {
return &ga4gh_task_ref.JobResponse{}, nil
}

job := &ga4gh_task_exec.Job{
JobId:a.TaskId,
Task:a,
JobId: a.TaskId,
Task: a,
}

return &ga4gh_task_ref.JobResponse{Job:job}, nil
return &ga4gh_task_ref.JobResponse{Job: job}, nil
}

func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.UpdateStatusRequest) (*ga4gh_task_exec.JobId, error) {
Expand All @@ -253,9 +247,9 @@ func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.
b_l := tx.Bucket(JOBS_LOG)

if stat.Log != nil {
log.Printf("Logging stdout:%s", stat.Log.Stdout )
log.Printf("Logging stdout:%s", stat.Log.Stdout)
d, _ := proto.Marshal(stat.Log)
b_l.Put([]byte(fmt.Sprint("%s-%d", stat.Id, stat.Step)), d )
b_l.Put([]byte(fmt.Sprint("%s-%d", stat.Id, stat.Step)), d)
}

switch stat.State {
Expand All @@ -265,17 +259,16 @@ func (self *TaskBolt) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.
}
return nil
})
return &ga4gh_task_exec.JobId{Value:stat.Id}, nil
return &ga4gh_task_exec.JobId{Value: stat.Id}, nil
}

func (self *TaskBolt) WorkerPing(ctx context.Context, info *ga4gh_task_ref.WorkerInfo) (*ga4gh_task_ref.WorkerInfo, error) {
log.Printf("Worker Ping")
return info, nil
}


func (self *TaskBolt) GetServiceInfo(ctx context.Context, info *ga4gh_task_exec.ServiceInfoRequest) (*ga4gh_task_exec.ServiceInfo, error) {
return &ga4gh_task_exec.ServiceInfo{StorageConfig:self.storage_metadata}, nil
return &ga4gh_task_exec.ServiceInfo{StorageConfig: self.storage_metadata}, nil
}

func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest, server ga4gh_task_ref.Scheduler_GetQueueInfoServer) error {
Expand All @@ -287,7 +280,7 @@ func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest
c := bq.Cursor()
var count int32 = 0
for k, v := c.First(); k != nil && count < request.MaxTasks; k, v = c.Next() {
if (string(v) == ga4gh_task_exec.State_Queued.String()) {
if string(v) == ga4gh_task_exec.State_Queued.String() {
v := bt.Get(k)
out := ga4gh_task_exec.Task{}
proto.Unmarshal(v, &out)
Expand All @@ -303,7 +296,7 @@ func (self *TaskBolt) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest
for _, i := range m.Inputs {
inputs = append(inputs, i.Location)
}
server.Send(&ga4gh_task_ref.QueuedTaskInfo{Inputs:inputs, Resources:m.Resources})
server.Send(&ga4gh_task_ref.QueuedTaskInfo{Inputs: inputs, Resources: m.Resources})
}
return nil
}
}
8 changes: 3 additions & 5 deletions src/ga4gh-server/task_interface.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@

package ga4gh_task

import (
//"golang.org/x/net/context"
//"ga4gh-tasks"
//"golang.org/x/net/context"
//"ga4gh-tasks"
)

type TaskImpl struct {
}


func NewTaskImpl() *TaskImpl {
return &TaskImpl{}
}
Expand Down Expand Up @@ -45,4 +43,4 @@ func (self *TaskImpl) CancelTaskOp(context.Context, *ga4gh_task_exec.TaskOpId) (
}
*/
*/
15 changes: 6 additions & 9 deletions src/ga4gh-server/task_server.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@

package ga4gh_task

import (
"ga4gh-server/proto"
"ga4gh-tasks"
"google.golang.org/grpc"
"log"
"net"
"google.golang.org/grpc"
"ga4gh-tasks"
"ga4gh-server/proto"
)

/// Common GA4GH server, multiple services could be placed into the same server
/// For the moment there is just the task server
type GA4GHServer struct {
task ga4gh_task_exec.TaskServiceServer
task ga4gh_task_exec.TaskServiceServer
sched ga4gh_task_ref.SchedulerServer
}

func NewGA4GHServer() *GA4GHServer {
return &GA4GHServer {}
return &GA4GHServer{}
}

func (self *GA4GHServer) RegisterTaskServer(task ga4gh_task_exec.TaskServiceServer) {
Expand All @@ -28,10 +27,8 @@ func (self *GA4GHServer) RegisterScheduleServer(sched ga4gh_task_ref.SchedulerSe
self.sched = sched
}



func (self *GA4GHServer) Start(host_port string) {
lis, err := net.Listen("tcp", ":" + host_port)
lis, err := net.Listen("tcp", ":"+host_port)
if err != nil {
panic("Cannot open port")
}
Expand Down
51 changes: 28 additions & 23 deletions src/ga4gh-taskserver/server.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
package main

import (
"os"
"fmt"
"flag"
"net/http"
"path/filepath"
"golang.org/x/net/context"
"fmt"
"ga4gh-engine"
"ga4gh-engine/scaling"
"ga4gh-server"
"ga4gh-tasks"
"github.com/gorilla/mux"
"google.golang.org/grpc"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"ga4gh-tasks"
"ga4gh-server"
"runtime/debug"
"golang.org/x/net/context"
"google.golang.org/grpc"
"log"
"ga4gh-engine"
"ga4gh-engine/scaling"
"net/http"
"os"
"path/filepath"
"runtime/debug"
)



func main() {
http_port := flag.String("port", "8000", "HTTP Port")
rpc_port := flag.String("rpc", "9090", "HTTP Port")
storage_dir_arg := flag.String("storage", "storage", "Storage Dir")
swift_arg := flag.Bool("swift", false, "Use SWIFT object store")
task_db := flag.String("db", "ga4gh_tasks.db", "Task DB File")
scaler_name := flag.String("scaler", "local", "Scaler")

flag.Parse()
dir, _ := filepath.Abs(os.Args[0])

dir, _ := filepath.Abs(os.Args[0])
content_dir := filepath.Join(dir, "..", "..", "share")

config := map[string]string{}
Expand All @@ -39,10 +38,16 @@ func main() {

//server meta-data
storage_dir, _ := filepath.Abs(*storage_dir_arg)
meta_data := map[string]string{ "storageType" : "sharedFile", "baseDir" : storage_dir }
var meta_data = make(map[string]string)
if !*swift_arg {
meta_data["storageType"] = "sharedFile"
meta_data["baseDir"] = storage_dir
} else {
meta_data["storageType"] = "swift"
}

//setup GRPC listener
taski := ga4gh_task.NewTaskBolt(*task_db, meta_data) //ga4gh_task.NewTaskImpl()
taski := ga4gh_task.NewTaskBolt(*task_db, meta_data)

//setup scheduler
scheduler := ga4gh_taskengine.Scheduler(taski, scaler)
Expand All @@ -58,9 +63,9 @@ func main() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
opts := []grpc.DialOption{grpc.WithInsecure()}
log.Println("Proxy connecting to localhost:" + *rpc_port )
err := ga4gh_task_exec.RegisterTaskServiceHandlerFromEndpoint(ctx, grpc_mux, "localhost:" + *rpc_port, opts)
if (err != nil) {
log.Println("Proxy connecting to localhost:" + *rpc_port)
err := ga4gh_task_exec.RegisterTaskServiceHandlerFromEndpoint(ctx, grpc_mux, "localhost:"+*rpc_port, opts)
if err != nil {
fmt.Println("Register Error", err)

}
Expand All @@ -74,12 +79,12 @@ func main() {
}
// Routes consist of a path and a handler function
r.HandleFunc("/",
func (w http.ResponseWriter, r *http.Request) {
func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, filepath.Join(content_dir, "index.html"))
})
r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.Dir(content_dir))))

r.PathPrefix("/v1/").Handler(grpc_mux)
log.Printf("Listening on port: %s\n", *http_port)
http.ListenAndServe(":" + *http_port, r)
}
http.ListenAndServe(":"+*http_port, r)
}

0 comments on commit 8335bfc

Please sign in to comment.