Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add execution worker pool #5

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions runner/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,34 @@ type Config struct {
}

type App struct {
config *Config
webEngine *gin.Engine
deps Dependencies
config *Config
Dependencies
}

type Dependencies struct {
runnerService RunnerService
webEngine *gin.Engine
executionWorkerPool *ExecutionWorkerPool
runnerService RunnerService
}

func DefaultDependencies(config *Config) Dependencies {
webEngine := gin.New()
webEngine.Use(gin.Recovery())

mode := os.Getenv(gin.EnvGinMode)
gin.SetMode(mode)

runnerService, err := NewRunnerService(config)
if err != nil {
log.Fatalf("Failed to create the runner instance: %s", err)
}

executionWorkerPool := NewExecutionWorkerPool(runnerService)

return Dependencies{
runnerService: runnerService,
webEngine,
executionWorkerPool,
runnerService,
}
}

Expand All @@ -48,25 +59,18 @@ func NewApp(config *Config) (*App, error) {

func NewAppWithDeps(config *Config, deps Dependencies) (*App, error) {
app := &App{
config: config,
deps: deps,
config: config,
Dependencies: deps,
}

engine := gin.New()
engine.Use(gin.Recovery())

mode := os.Getenv(gin.EnvGinMode)
gin.SetMode(mode)

apiGroup := engine.Group("/api")
apiGroup := deps.webEngine.Group("/api")
{
apiGroup.GET("/health", HealthHandler)
apiGroup.GET("/ready", ReadyHandler(deps.runnerService))
apiGroup.GET("/catalog", CatalogHandler(deps.runnerService))
apiGroup.POST("/execute", ExecutionHandler(deps.runnerService))
}

app.webEngine = engine

return app, nil
}

Expand All @@ -91,9 +95,15 @@ func (a *App) Start(ctx context.Context) error {
return nil
})

log.Infof("Starting execution requests worker pool....")
g.Go(func() error {
a.executionWorkerPool.Run(ctx)
return nil
})

log.Infof("Building catalog....")
g.Go(func() error {
err := a.deps.runnerService.BuildCatalog()
err := a.runnerService.BuildCatalog()
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions runner/catalog_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ func (suite *CatalogApiTestCase) Test_GetCatalogTest() {
mockRunnerService := new(MockRunnerService)
mockRunnerService.On("GetCatalog").Return(returnedCatalog)

deps := Dependencies{
mockRunnerService,
}
deps := setupTestDependencies()
deps.runnerService = mockRunnerService

app, err := NewAppWithDeps(suite.config, deps)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions runner/execution_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package runner

import (
"github.com/gin-gonic/gin"
)

func ExecutionHandler(runnerService RunnerService) gin.HandlerFunc {
return func(c *gin.Context) {
var r *ExecutionEvent

if err := c.BindJSON(&r); err != nil {
c.Error(err)
c.AbortWithStatusJSON(500, gin.H{"status": "nok", "message": err.Error()})
return
}

if err := runnerService.ScheduleExecution(r); err != nil {
c.Error(err)
c.AbortWithStatusJSON(500, gin.H{"status": "nok", "message": err.Error()})
return
}

c.JSON(202, map[string]string{"status": "ok"})
}
}
5 changes: 2 additions & 3 deletions runner/health_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func (suite *HealthApiTestCase) Test_ApiReadyTest() {
mockRunnerService := new(MockRunnerService)
mockRunnerService.On("IsCatalogReady").Return(true)

deps := Dependencies{
mockRunnerService,
}
deps := setupTestDependencies()
deps.runnerService = mockRunnerService

app, err := NewAppWithDeps(suite.config, deps)
if err != nil {
Expand Down
40 changes: 35 additions & 5 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,41 @@ import (
var ansibleFS embed.FS

const (
executionChannelSize = 99

AnsibleMain = "ansible/check.yml"
AnsibleMeta = "ansible/meta.yml"
AnsibleConfigFile = "ansible/ansible.cfg"
AnsibleHostFile = "ansible/ansible_hosts"
)

type ExecutionEvent struct {
ID int64 `json:"id" binding:"required"`
}

//go:generate mockery --name=RunnerService --inpackage --filename=runner_mock.go

type RunnerService interface {
IsCatalogReady() bool
BuildCatalog() error
GetCatalog() map[string]*Catalog
GetChannel() chan *ExecutionEvent
ScheduleExecution(e *ExecutionEvent) error
Execute(e *ExecutionEvent) error
}

type runnerService struct {
config *Config
catalog map[string]*Catalog
ready bool
config *Config
workerPoolChannel chan *ExecutionEvent
catalog map[string]*Catalog
ready bool
}

func NewRunnerService(config *Config) (*runnerService, error) {
runner := &runnerService{
config: config,
ready: false,
config: config,
workerPoolChannel: make(chan *ExecutionEvent, executionChannelSize),
ready: false,
}

return runner, nil
Expand Down Expand Up @@ -87,6 +98,25 @@ func (c *runnerService) GetCatalog() map[string]*Catalog {
return c.catalog
}

func (c *runnerService) GetChannel() chan *ExecutionEvent {
return c.workerPoolChannel
}

func (c *runnerService) ScheduleExecution(e *ExecutionEvent) error {
if len(c.workerPoolChannel) == executionChannelSize {
return fmt.Errorf("Cannot process more executions")
}

c.workerPoolChannel <- e
log.Infof("Scheduled event: %d", e.ID)
return nil
}

func (c *runnerService) Execute(e *ExecutionEvent) error {
log.Infof("Executing event: %d", e.ID)
return nil
}

func createAnsibleFiles(folder string) error {
log.Infof("Creating the ansible file structure in %s", folder)
// Clean the folder if it stores old files
Expand Down
44 changes: 44 additions & 0 deletions runner/runner_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ func (suite *RunnerTestCase) Test_BuildCatalog() {
suite.Equal(expectedMap, suite.runnerService.GetCatalog())
}

func (suite *RunnerTestCase) Test_ScheduleExecution() {
execution := &ExecutionEvent{ID: 1}
err := suite.runnerService.ScheduleExecution(execution)
suite.NoError(err)
suite.Equal(execution, <-suite.runnerService.GetChannel())
}

func (suite *RunnerTestCase) Test_ScheduleExecution_Full() {
ch := suite.runnerService.GetChannel()
for _, index := range [executionChannelSize]int64{} {
ch <- &ExecutionEvent{ID: index}
}

execution := &ExecutionEvent{ID: 1}
err := suite.runnerService.ScheduleExecution(execution)
suite.EqualError(err, "Cannot process more executions")
}

// TODO: This test could be improved to check the definitve ansible files structure
// once we have something fixed
func (suite *RunnerTestCase) Test_CreateAnsibleFiles() {
Expand Down
11 changes: 11 additions & 0 deletions runner/test_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package runner

import (
"github.com/gin-gonic/gin"
)

func setupTestDependencies() Dependencies {
return Dependencies{
webEngine: gin.Default(),
}
}
55 changes: 55 additions & 0 deletions runner/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package runner

import (
"context"
"time"

log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

var workersNumber int64 = 3
var drainTimeout = time.Second * 5

type ExecutionWorkerPool struct {
runnerService RunnerService
}

func NewExecutionWorkerPool(runnerService RunnerService) *ExecutionWorkerPool {
return &ExecutionWorkerPool{
runnerService: runnerService,
}
}

// Run runs a pool of workers to process the execution requests
func (e *ExecutionWorkerPool) Run(ctx context.Context) {
log.Infof("Starting execution pool. Workers limit: %d", workersNumber)
sem := semaphore.NewWeighted(workersNumber)
channel := e.runnerService.GetChannel()

for {
select {
case execution := <-channel:
if err := sem.Acquire(ctx, 1); err != nil {
log.Debugf("Discarding execution: %d, shutting down already.", execution.ID)
break
}

go func() {
defer sem.Release(1)
e.runnerService.Execute(execution)
}()
case <-ctx.Done():
log.Infof("Projectors worker pool is shutting down... Waiting for active workers to drain.")

ctx, cancel := context.WithTimeout(context.Background(), drainTimeout)
defer cancel()

if err := sem.Acquire(ctx, workersNumber); err != nil {
log.Warnf("Timed out while draining workers: %v", err)
}

return
}
}
}
Loading