Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Use goroutine pool within autoscaler to limit concurrency.
Browse files Browse the repository at this point in the history
Previously the internal autoscaler iterated through the scaling
policies and launched a go routine to run the autoscaling check
process. There were no limits on the number of routines which
could lead to overloading on the Nomad API.

This change introduces a goroutine pool in order to limit the
number of concurrent scaling threads. The number of threads is
configurable via the CLI, with a default of 3. When all threads
are in use, the iteration will block until additional work
can be consumed. There is a timelimit of the execution, currently
set to 60's which we may want to make configurable in the future.
  • Loading branch information
jrasell committed Aug 21, 2019
1 parent 43b2ec3 commit 5a1b116
Show file tree
Hide file tree
Showing 21 changed files with 1,640 additions and 8 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/hashicorp/nomad/api v0.0.0-20190508234936-7ba2378a159e
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/mattn/go-isatty v0.0.7
github.com/panjf2000/ants v0.0.0-20190820151255-b60dfa8c16b0
github.com/pkg/errors v0.8.1
github.com/rs/zerolog v1.14.3
github.com/ryanuber/columnize v2.1.0+incompatible
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/panjf2000/ants v0.0.0-20190820151255-b60dfa8c16b0 h1:otVy5M/CL7sQJRx9MxpFq3feQp7AEgx1Hp4/RPBRqkE=
github.com/panjf2000/ants v0.0.0-20190820151255-b60dfa8c16b0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/panjf2000/ants v1.2.0 h1:pMQ1/XpSgnWx3ro4y1xr/uA3jXUsTuAaU3Dm0JjwggE=
github.com/panjf2000/ants v1.2.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down
2 changes: 2 additions & 0 deletions pkg/autoscale/autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (a *AutoScale) autoscaleJob(jobID string, policies map[string]*policy.Group
cpuUsage := resourceUsage[group].cpu * 100 / resourceInfo[group].cpu
memUsage := resourceUsage[group].mem * 100 / resourceInfo[group].mem
a.logger.Debug().
Str("job", jobID).
Str("group", group).
Int("mem-usage-percentage", memUsage).
Int("cpu-usage-percentage", cpuUsage).
Msg("resource utilisation calculation")
Expand Down
1 change: 1 addition & 0 deletions pkg/autoscale/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ package autoscale

type Config struct {
ScalingInterval int
ScalingThreads int
StrictChecking bool
}
45 changes: 40 additions & 5 deletions pkg/autoscale/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/jrasell/sherpa/pkg/policy"
"github.com/jrasell/sherpa/pkg/policy/backend"
"github.com/jrasell/sherpa/pkg/scale"
"github.com/panjf2000/ants"
"github.com/rs/zerolog"
)

Expand All @@ -17,6 +18,7 @@ type AutoScale struct {
scaler scale.Scale

policyBackend backend.PolicyBackend
pool *ants.PoolWithFunc
state *state
}

Expand All @@ -29,15 +31,28 @@ type scalableResources struct {
mem int
}

func NewAutoScaleServer(l zerolog.Logger, n *nomad.Client, p backend.PolicyBackend, cfg *Config) *AutoScale {
return &AutoScale{
type workerPayload struct {
jobID string
policy map[string]*policy.GroupScalingPolicy
}

func NewHandler(l zerolog.Logger, n *nomad.Client, p backend.PolicyBackend, cfg *Config) (*AutoScale, error) {
as := AutoScale{
cfg: cfg,
logger: l,
nomad: n,
policyBackend: p,
scaler: scale.NewScaler(n, l, cfg.StrictChecking),
state: &state{},
}

pool, err := as.createWorkerPool()
if err != nil {
return nil, err
}
as.pool = pool

return &as, nil
}

func (a *AutoScale) Run() {
Expand All @@ -46,6 +61,8 @@ func (a *AutoScale) Run() {
t := time.NewTicker(time.Second * time.Duration(a.cfg.ScalingInterval))
defer t.Stop()

defer a.pool.Release()

for {
select {
case <-t.C:
Expand Down Expand Up @@ -73,8 +90,11 @@ func (a *AutoScale) Run() {
}

for job := range allPolicies {
go a.runScalingCycle(job, allPolicies[job])
if err := a.pool.Invoke(&workerPayload{jobID: job, policy: allPolicies[job]}); err != nil {
a.logger.Error().Err(err).Msg("failed to invoke autoscaling worker thread")
}
}

a.setScalingInProgressFalse()
}
}
Expand All @@ -88,6 +108,21 @@ func (a *AutoScale) setScalingInProgressFalse() {
a.state.inProgress = false
}

func (a *AutoScale) runScalingCycle(job string, policy map[string]*policy.GroupScalingPolicy) {
a.autoscaleJob(job, policy)
// createWorkerPool is responsible for building the ants goroutine worker pool with the number of
// threads controlled by the operator configured value.
func (a *AutoScale) createWorkerPool() (*ants.PoolWithFunc, error) {
return ants.NewPoolWithFunc(
ants.Options{
Capacity: a.cfg.ScalingThreads,
ExpiryDuration: 60 * time.Second,
PoolFunc: func(payload interface{}) {
req, ok := payload.(*workerPayload)
if !ok {
a.logger.Error().Msg("autoscaler worker pool received unexpected payload type")
return
}
a.autoscaleJob(req.jobID, req.policy)
},
},
)
}
28 changes: 28 additions & 0 deletions pkg/autoscale/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package autoscale

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAutoScale_createWorkerPool(t *testing.T) {
testCases := []struct {
autoscaleStruct *AutoScale
expectedThreads int
testName string
}{
{
autoscaleStruct: &AutoScale{cfg: &Config{ScalingThreads: 100}},
expectedThreads: 100,
testName: "check worker pool number of concurrent threads",
},
}

for _, tc := range testCases {
pool, err := tc.autoscaleStruct.createWorkerPool()

assert.Nil(t, err, tc.testName)
assert.Equal(t, tc.expectedThreads, pool.Cap(), tc.testName)
}
}
18 changes: 18 additions & 0 deletions pkg/config/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
configKeyBindPort = "bind-port"
configKeyAutoscalerEnabled = "autoscaler-enabled"
configKeyAutoscalerEvaluationInterval = "autoscaler-evaluation-interval"
configKeyAutoscalerThreadNumber = "autoscaler-num-threads"
configKeyAutoscalerThreadNumberDefault = 3
configKeyPolicyEngineAPIEnabled = "policy-engine-api-enabled"
configKeyPolicyEngineNomadMetaEnabled = "policy-engine-nomad-meta-enabled"
configKeyPolicyEngineStrictCheckingEnabled = "policy-engine-strict-checking-enabled"
Expand All @@ -33,6 +35,7 @@ type Config struct {
InternalAutoScaler bool
ConsulStorageBackend bool
InternalAutoScalerEvalPeriod int
InternalAutoScalerNumThreads int
}

func (c *Config) MarshalZerologObject(e *zerolog.Event) {
Expand All @@ -43,6 +46,7 @@ func (c *Config) MarshalZerologObject(e *zerolog.Event) {
Bool(configKeyPolicyEngineStrictCheckingEnabled, c.StrictPolicyChecking).
Bool(configKeyAutoscalerEnabled, c.InternalAutoScaler).
Int(configKeyAutoscalerEvaluationInterval, c.InternalAutoScalerEvalPeriod).
Int(configKeyAutoscalerThreadNumber, c.InternalAutoScalerNumThreads).
Bool(configKeyStorageBackendConsulEnabled, c.ConsulStorageBackend).
Str(configKeyStorageBackendConsulPath, c.ConsulStorageBackendPath)
}
Expand All @@ -56,6 +60,7 @@ func GetConfig() Config {
StrictPolicyChecking: viper.GetBool(configKeyPolicyEngineStrictCheckingEnabled),
InternalAutoScaler: viper.GetBool(configKeyAutoscalerEnabled),
InternalAutoScalerEvalPeriod: viper.GetInt(configKeyAutoscalerEvaluationInterval),
InternalAutoScalerNumThreads: viper.GetInt(configKeyAutoscalerThreadNumber),
ConsulStorageBackend: viper.GetBool(configKeyStorageBackendConsulEnabled),
ConsulStorageBackendPath: viper.GetString(configKeyStorageBackendConsulPath),
}
Expand Down Expand Up @@ -155,6 +160,19 @@ func RegisterConfig(cmd *cobra.Command) {
viper.SetDefault(key, defaultValue)
}

{
const (
key = configKeyAutoscalerThreadNumber
longOpt = "autoscaler-num-threads"
defaultValue = configKeyAutoscalerThreadNumberDefault
description = "Specifies the number of parallel autoscaler threads to run"
)

flags.Int(longOpt, defaultValue, description)
_ = viper.BindPFlag(key, flags.Lookup(longOpt))
viper.SetDefault(key, defaultValue)
}

{
const (
key = configKeyStorageBackendConsulEnabled
Expand Down
1 change: 1 addition & 0 deletions pkg/config/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ func Test_ServerConfig(t *testing.T) {
assert.Equal(t, true, cfg.StrictPolicyChecking)
assert.Equal(t, false, cfg.InternalAutoScaler)
assert.Equal(t, configKeyStorageBackendConsulPathDefault, cfg.ConsulStorageBackendPath)
assert.Equal(t, configKeyAutoscalerThreadNumberDefault, cfg.InternalAutoScalerNumThreads)
}
16 changes: 13 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func (h *HTTPServer) setup() error {
// If the server has been set to enable the internal autoscaler, set this up and start the
// process running.
if h.cfg.Server.InternalAutoScaler {
h.setupAutoScaling()
if err := h.setupAutoScaling(); err != nil {
return errors.Wrap(err, "failed to setup internal autoscaler")
}
}

initialRoutes := h.setupRoutes()
Expand Down Expand Up @@ -145,14 +147,22 @@ func (h *HTTPServer) setupNomadClient() error {
return nil
}

func (h *HTTPServer) setupAutoScaling() {
func (h *HTTPServer) setupAutoScaling() error {
h.logger.Debug().Msg("setting up Sherpa internal auto-scaling engine")
autoscaleCfg := &autoscale.Config{
StrictChecking: h.cfg.Server.StrictPolicyChecking,
ScalingInterval: h.cfg.Server.InternalAutoScalerEvalPeriod,
ScalingThreads: h.cfg.Server.InternalAutoScalerNumThreads,
}
h.autoScale = autoscale.NewAutoScaleServer(h.logger, h.nomad, h.policyBackend, autoscaleCfg)

as, err := autoscale.NewHandler(h.logger, h.nomad, h.policyBackend, autoscaleCfg)
if err != nil {
return err
}
h.autoScale = as
go h.autoScale.Run()

return nil
}

func (h *HTTPServer) setupListener() net.Listener {
Expand Down
17 changes: 17 additions & 0 deletions vendor/github.com/panjf2000/ants/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions vendor/github.com/panjf2000/ants/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/panjf2000/ants/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5a1b116

Please sign in to comment.