Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Move Nomad Meta policy to satisfy policy interface.
Browse files Browse the repository at this point in the history
Previously the Nomad meta policy engine conflicted with the Consul
storage backend. This was due to the early design of storage being
for policies only. Recently the storage backend has improved to
include additional state items, and therefore is not just for
policies. The Nomad meta policy engine should therefore be able to
run alongside the Consul state backend, with the Nomad meta
policies storage in memory. This is because we treat Nomad as the
source of truth.

The Nomad meta watcher is now also moved over to use the watcher
package, in the same way deployments are tracked. Several
improvements to the meta watcher are also included, such as
handling groups which have their policy removed, but the job
continues to run.
  • Loading branch information
jrasell committed Oct 1, 2019
1 parent 660d6ea commit 5567cd5
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 297 deletions.
5 changes: 0 additions & 5 deletions cmd/server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,5 @@ func verifyServerConfig(cfg serverCfg.Config) error {
if cfg.NomadMetaPolicyEngine && cfg.APIPolicyEngine {
return errors.New("Please only enable one policy engine")
}

if cfg.NomadMetaPolicyEngine && cfg.ConsulStorageBackend {
return errors.New("Consul storage backend is not compatible with Nomad meta policy engine")
}

return nil
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.12
require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/davecgh/go-spew v1.1.1
github.com/gofrs/uuid v3.2.0+incompatible
github.com/gorilla/mux v1.7.1
github.com/hashicorp/consul/api v1.1.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package watcher
package nomadmeta

const (
metaKeyEnabled = "sherpa_enabled"
Expand Down
21 changes: 21 additions & 0 deletions pkg/policy/backend/nomadmeta/nomadmeta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package nomadmeta

import (
"github.com/hashicorp/nomad/api"
"github.com/jrasell/sherpa/pkg/policy/backend"
"github.com/jrasell/sherpa/pkg/policy/backend/memory"
"github.com/rs/zerolog"
)

// NewJobScalingPolicies produces a new policy backend and processor. The policy backend is just
// the memory backend. The processor is used to handle job watcher updates, where the job is
// inspected for its status, and then any Sherpa meta parameters pulled out and validated.
func NewJobScalingPolicies(logger zerolog.Logger, nomad *api.Client) (backend.PolicyBackend, *Processor) {
b := memory.NewJobScalingPolicies()
return b, &Processor{
logger: logger,
nomad: nomad,
backend: b,
jobUpdateChan: make(chan interface{}),
}
}
228 changes: 228 additions & 0 deletions pkg/policy/backend/nomadmeta/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package nomadmeta

import (
"strconv"

"github.com/hashicorp/nomad/api"
"github.com/jrasell/sherpa/pkg/policy"
"github.com/jrasell/sherpa/pkg/policy/backend"
"github.com/rs/zerolog"
)

type Processor struct {
logger zerolog.Logger
nomad *api.Client
backend backend.PolicyBackend
jobUpdateChan chan interface{}
}

func (pr *Processor) Run() {
pr.logger.Info().Msg("starting Nomad meta job update handler")
for {
select {
case msg := <-pr.jobUpdateChan:
go pr.handleJobListMessage(msg)
}
}
}

func (pr *Processor) GetUpdateChannel() chan interface{} {
return pr.jobUpdateChan
}

func (pr *Processor) handleJobListMessage(msg interface{}) {
job, ok := msg.(*api.JobListStub)
if !ok {
pr.logger.Error().Msg("received unexpected job update message type")
return
}
pr.logger.Debug().Msg("received job list update message to handle")

switch job.Status {
case "running":
go pr.handleRunningJob(job.ID)
case "dead":
go pr.handleDeadJob(job.ID)
case "pending":
// Pending is an in-between state, so just pass this through and do not do any work until
// the job has a more actionable state.
}
}

func (pr *Processor) handleDeadJob(jobID string) {
if err := pr.backend.DeleteJobPolicy(jobID); err != nil {
pr.logger.Error().
Str("job", jobID).
Err(err).
Msg("failed to delete job group policies from backend store")
}
}

func (pr *Processor) handleRunningJob(jobID string) {
pr.logger.Debug().Str("job", jobID).Msg("reading job group meta stanzas")

info, _, err := pr.nomad.Jobs().Info(jobID, nil)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to call Nomad API for job information")
return
}

// Create a new object which will track all policies pulled from the job. Creating a new object
// helps remove policies which have been removed from task groups as the policy state will be
// overwritten.
policies := map[string]*policy.GroupScalingPolicy{}

for i := range info.TaskGroups {
if pr.hasMetaKeys(info.TaskGroups[i].Meta) {
policies[*info.TaskGroups[i].Name] = pr.policyFromMeta(info.TaskGroups[i].Meta)
}
}

// If we have 0 policies, delete any stored policies for that job. This helps protect against
// situations where a jobs meta scaling policy has been removed, but the job is still running.
switch len(policies) {
case 0:
if err := pr.backend.DeleteJobPolicy(jobID); err != nil {
pr.logger.Error().
Str("job", jobID).
Err(err).
Msg("failed to delete job group policies from backend store")
}
default:
if err := pr.backend.PutJobPolicy(jobID, policies); err != nil {
pr.logger.Error().
Str("job", jobID).
Err(err).
Msg("failed to add job group policies to backend store")
}
}
}

func (pr *Processor) policyFromMeta(meta map[string]string) *policy.GroupScalingPolicy {
return &policy.GroupScalingPolicy{
MaxCount: pr.maxCountValueOrDefault(meta),
MinCount: pr.minCountValueOrDefault(meta),
Enabled: pr.enabledValueOrDefault(meta),
ScaleInCount: pr.scaleInValueOrDefault(meta),
ScaleOutCount: pr.scaleOutValueOrDefault(meta),
ScaleOutCPUPercentageThreshold: pr.scaleOutCPUThresholdValueOrDefault(meta),
ScaleOutMemoryPercentageThreshold: pr.scaleOutMemoryThresholdValueOrDefault(meta),
ScaleInCPUPercentageThreshold: pr.scaleInCPUThresholdValueOrDefault(meta),
ScaleInMemoryPercentageThreshold: pr.scaleInMemoryThresholdValueOrDefault(meta),
}
}

func (pr *Processor) enabledValueOrDefault(meta map[string]string) bool {
if val, ok := meta[metaKeyEnabled]; ok {
enabled, err := strconv.ParseBool(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert max count meta value to int")
return false
}
return enabled
}
return false
}

func (pr *Processor) maxCountValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyMaxCount]; ok {
maxInt, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert max count meta value to int")
return policy.DefaultMaxCount
}
return maxInt
}
return policy.DefaultMaxCount
}

func (pr *Processor) minCountValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyMinCount]; ok {
maxInt, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert min count meta value to int")
return policy.DefaultMinCount
}
return maxInt
}
return policy.DefaultMinCount
}

func (pr *Processor) scaleInValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyScaleInCount]; ok {
inCount, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert scale in meta value to int")
return policy.DefaultScaleInCount
}
return inCount
}
return policy.DefaultScaleInCount
}

func (pr *Processor) scaleOutValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyScaleOutCount]; ok {
outCount, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert scale out meta value to int")
return policy.DefaultScaleOutCount
}
return outCount
}
return policy.DefaultScaleOutCount
}

func (pr *Processor) scaleOutCPUThresholdValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyScaleOutCPUPercentageThreshold]; ok {
outThreshold, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert scale out CPU meta value to int")
return policy.DefaultScaleOutCPUPercentageThreshold
}
return outThreshold
}
return policy.DefaultScaleOutCPUPercentageThreshold
}

func (pr *Processor) scaleInCPUThresholdValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyScaleInCPUPercentageThreshold]; ok {
outThreshold, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert scale in CPU meta value to int")
return policy.DefaultScaleInCPUPercentageThreshold
}
return outThreshold
}
return policy.DefaultScaleInCPUPercentageThreshold
}

func (pr *Processor) scaleOutMemoryThresholdValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyScaleOutMemoryPercentageThreshold]; ok {
outThreshold, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert scale out memory meta value to int")
return policy.DefaultScaleOutMemoryPercentageThreshold
}
return outThreshold
}
return policy.DefaultScaleOutMemoryPercentageThreshold
}

func (pr *Processor) scaleInMemoryThresholdValueOrDefault(meta map[string]string) int {
if val, ok := meta[metaKeyScaleInMemoryPercentageThreshold]; ok {
outThreshold, err := strconv.Atoi(val)
if err != nil {
pr.logger.Error().Err(err).Msg("failed to convert scale in memory meta value to int")
return policy.DefaultScaleInMemoryPercentageThreshold
}
return outThreshold
}
return policy.DefaultScaleInMemoryPercentageThreshold
}

func (pr *Processor) hasMetaKeys(meta map[string]string) bool {
if _, ok := meta[metaKeyEnabled]; ok {
return true
}
return false
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package watcher
package nomadmeta

import (
"testing"
Expand All @@ -8,8 +8,8 @@ import (
"github.com/stretchr/testify/assert"
)

func TestMetaWatcher_policyFromMeta(t *testing.T) {
watcher := NewMetaWatcher(zerolog.Logger{}, nil, nil)
func TestProcessor_policyFromMeta(t *testing.T) {
_, p := NewJobScalingPolicies(zerolog.Logger{}, nil)

testCases := []struct {
meta map[string]string
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestMetaWatcher_policyFromMeta(t *testing.T) {
}

for _, tc := range testCases {
actualPolicy := watcher.policyFromMeta(tc.meta)
actualPolicy := p.policyFromMeta(tc.meta)
assert.Equal(t, tc.expectedPolicy, actualPolicy)
}
}
Loading

0 comments on commit 5567cd5

Please sign in to comment.