Skip to content

Commit

Permalink
Merge pull request ga4gh#1 from bmeg/master
Browse files Browse the repository at this point in the history
Initial version of working reference server
  • Loading branch information
kellrott authored Oct 5, 2016
2 parents dd0162f + 7193cb5 commit 769f0ae
Show file tree
Hide file tree
Showing 25 changed files with 2,882 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.pyc
.idea
/bin
/pkg
/test_tmp
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "task-execution-schemas"]
path = task-execution-schemas
url = https://github.com/ga4gh/task-execution-schemas.git
32 changes: 32 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

GOPATH := $(shell pwd)
export GOPATH
PATH := ${PATH}:$(shell pwd)/bin
export PATH

PROTO_INC= -I ./ -I $(GOPATH)/src/github.com/gengo/grpc-gateway/third_party/googleapis/

server:
go install ga4gh-taskserver
go install ga4gh-worker

proto_build:
cd task-execution-schemas/proto && protoc $(PROTO_INC) \
--go_out=Mgoogle/api/annotations.proto=github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis/google/api,plugins=grpc:../../src/ga4gh-tasks/ \
--grpc-gateway_out=logtostderr=true:../../src/ga4gh-tasks/ \
task_execution.proto
cd proto && protoc \
$(PROTO_INC) \
-I ../task-execution-schemas/proto/ \
--go_out=Mtask_execution.proto=ga4gh-tasks,plugins=grpc:../src/ga4gh-server/proto \
task_worker.proto

grpc:
go get -u github.com/golang/protobuf/protoc-gen-go
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway

depends: grpc
go get -d ga4gh-taskserver/
go get -d ga4gh-worker/

44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,43 @@
# task-execution-server
# task-execution-server

## Initial tool install
```
make depends
```


## Build project
```
make
```

## Start task server
```
./bin/ga4gh-taskserver
```

## Start worker
```
./bin/ga4gh-worker
```

## Get info about task execution service
```
curl http://localhost:8000/v1/jobs-service
```

## Get Task Execution Server CWL runner
```
git clone https://github.com/bmeg/funnel.git
cd funnel/
virtualenv venv
. venv/bin/activate
pip install cwltool
pip install pyyaml
```

## Run Example workflow
```
python funnel/main.py --tes tes.yaml test/hashsplitter-workflow.cwl --input README.md
```
52 changes: 52 additions & 0 deletions proto/task_worker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

syntax = "proto3";

package ga4gh_task_ref;

import "task_execution.proto";

/**
* Worker Info
*/
message WorkerInfo {
string id = 1; /// UUID of the worker
int64 last_ping = 2; /// Last time contacted
string hostname = 3;
}

message JobRequest {
WorkerInfo worker = 1;
ga4gh_task_exec.Resources resources = 2;
}

message JobResponse {
ga4gh_task_exec.Job job = 2;
}

message UpdateStatusRequest {
string id = 1;
int64 step = 2;
ga4gh_task_exec.State state = 3;
ga4gh_task_exec.JobLog log = 4;
string worker_id = 5;
}

message QueuedTaskInfoRequest {
int32 max_tasks = 1;
}

message QueuedTaskInfo {
repeated string inputs = 1;
ga4gh_task_exec.Resources resources = 2;
}


/**
* Scheduler Service
*/
service Scheduler {
rpc GetQueueInfo(QueuedTaskInfoRequest) returns(stream QueuedTaskInfo) {};
rpc GetJobToRun(JobRequest) returns (JobResponse) {};
rpc UpdateJobStatus(UpdateStatusRequest) returns (ga4gh_task_exec.JobId) {};
rpc WorkerPing(WorkerInfo) returns (WorkerInfo) {};
}
37 changes: 37 additions & 0 deletions src/ga4gh-engine/scaling/mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

package ga4gh_engine_scaling
import (
"ga4gh-tasks"
"ga4gh-server/proto"
)


type Scaler interface {
JobAdded(*ga4gh_task_exec.Resources)
PingReceived(*ga4gh_task_ref.WorkerInfo)
}

type ScalerInit func(map[string]string) Scaler


var ScalingMethods = map[string]ScalerInit{
"local" : NewLocalScaler,
}

type LocalScaler struct {

}

func NewLocalScaler(config map[string]string) Scaler {
return LocalScaler{}
}

func (self LocalScaler) JobAdded(request *ga4gh_task_exec.Resources) {
//check for number of running workers

//launch new worker if needed and possible
}

func (self LocalScaler) PingReceived(worker *ga4gh_task_ref.WorkerInfo) {
//do something here
}
42 changes: 42 additions & 0 deletions src/ga4gh-engine/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

package ga4gh_taskengine
import (
"ga4gh-tasks"
"ga4gh-engine/scaling"
"golang.org/x/net/context"
"ga4gh-server/proto"
//"log"
)

type TaskDB interface {
ga4gh_task_exec.TaskServiceServer
ga4gh_task_ref.SchedulerServer
}

func Scheduler(task_server TaskDB, scaler ga4gh_engine_scaling.Scaler) *TaskScheduler {
return &TaskScheduler{task_server:task_server, scaler:scaler}
}

type TaskScheduler struct {
task_server TaskDB
scaler ga4gh_engine_scaling.Scaler
}

func (self *TaskScheduler) WorkerPing(ctx context.Context, info *ga4gh_task_ref.WorkerInfo) (*ga4gh_task_ref.WorkerInfo, error) {
self.scaler.PingReceived(info)
return info, nil
}


func (self *TaskScheduler) GetJobToRun(ctx context.Context, request *ga4gh_task_ref.JobRequest) (*ga4gh_task_ref.JobResponse, error) {
return self.task_server.GetJobToRun(ctx, request)
}


func (self *TaskScheduler) UpdateJobStatus(ctx context.Context, stat *ga4gh_task_ref.UpdateStatusRequest) (*ga4gh_task_exec.JobId, error) {
return self.task_server.UpdateJobStatus(ctx, stat)
}

func (self *TaskScheduler) GetQueueInfo(request *ga4gh_task_ref.QueuedTaskInfoRequest, server ga4gh_task_ref.Scheduler_GetQueueInfoServer) error {
return self.task_server.GetQueueInfo(request, server)
}
94 changes: 94 additions & 0 deletions src/ga4gh-engine/worker/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@

package ga4gh_taskengine_worker

import (
"os"
"log"
"strings"
"github.com/fsouza/go-dockerclient"
)


type ContainerManager interface {
Run(container string, args []string, binds[] string, remove bool, stdout_path *string, stderr_path *string) error
}

type DockerDirect struct {
client *docker.Client
}

func NewDockerDirect() *DockerDirect {
client, err := docker.NewClientFromEnv()
if err != nil {
log.Printf("Docker Error\n")
return nil
}
return &DockerDirect{ client:client }
}

func (self *DockerDirect) Run(containerName string, args []string, binds[] string, remove bool, stdout *os.File, stderr *os.File) (int, error) {

create_config := docker.Config{
Image:containerName,
Cmd:args,
AttachStderr:true,
AttachStdout:true,
}

if _, ok := self.client.InspectImage(containerName); ok != nil {
log.Printf("Image %s not found", containerName)
tmp := strings.Split(containerName, ":")
rep := tmp[0]
tag := "latest"
if len(tmp) > 1 {
tag = tmp[1]
}
pull_opt := docker.PullImageOptions{ Repository: rep, Tag: tag }
if ok := self.client.PullImage( pull_opt, docker.AuthConfiguration{} ); ok != nil {
log.Printf("Image not pulled: %s", ok)
return -1, ok
}
log.Printf("Image Pulled")
}

container, err := self.client.CreateContainer(docker.CreateContainerOptions{
Config: &create_config,
})
if err != nil {
log.Printf("Docker run Error: %s", err)
return 0, err
}

log.Printf("Starting Docker (mount: %s): %s", strings.Join(binds, ","), strings.Join(args, " "))
err = self.client.StartContainer(container.ID, &docker.HostConfig {
Binds: binds,
})

if err != nil {
log.Printf("Docker run Error: %s", err)
return 0, err
}

log.Printf("Attaching Container: %s", container.ID)
exit_code, err := self.client.WaitContainer(container.ID)

logOpts := docker.LogsOptions{Container:container.ID, Stdout:false, Stderr:false}

if stdout != nil {
logOpts.Stdout = true
logOpts.OutputStream = stdout
}
if stderr != nil {
logOpts.Stderr = true
logOpts.ErrorStream = stderr
}

self.client.Logs(logOpts)
if err != nil {
log.Printf("docker %s error: %s", container.ID, err)
} else {
log.Printf("docker %s complete", container.ID, err)
}
self.client.RemoveContainer(docker.RemoveContainerOptions{ID:container.ID,RemoveVolumes:true})
return exit_code, nil
}
Loading

0 comments on commit 769f0ae

Please sign in to comment.