Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
scale-meta: meta should be unique per job group.
Browse files Browse the repository at this point in the history
The autoscaler works by analysing all groups within a job then
submitting the combined changes required to the scaler process to
action. Therefore the metadata sent with a request should be part
of the group request, not the function call allowing for metadata
to be unique per group.
  • Loading branch information
jrasell committed Nov 1, 2019
1 parent 0365cb5 commit 5d8737f
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 14 deletions.
7 changes: 4 additions & 3 deletions pkg/autoscale/autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (a *AutoScale) autoscaleJob(jobID string, policies map[string]*policy.Group
}

var scaleReq []*scale.GroupReq // nolint:prealloc
meta := make(map[string]string)

for group, pol := range policies {

Expand Down Expand Up @@ -72,12 +71,14 @@ func (a *AutoScale) autoscaleJob(jobID string, policies map[string]*policy.Group
groupLogger.Debug().Msg("no scaling required")
continue
}
groupLogger.Debug().
groupLogger.Info().
Object("decision", decision).
Msg("scaling decision made and action required")

// Iterate over the resource metrics which have broken their thresholds and ensure these
// are added to the submission meta.
meta := make(map[string]string)

for name, metric := range decision.metrics {
updateAutoscaleMeta(group, name, metric.usage, metric.threshold, meta)
}
Expand All @@ -100,7 +101,7 @@ func (a *AutoScale) autoscaleJob(jobID string, policies map[string]*policy.Group
// If group scaling requests have been added to the array for the job that is currently being
// checked, trigger a scaling event.
if len(scaleReq) > 0 {
resp, _, err := a.scaler.Trigger(jobID, scaleReq, state.SourceInternalAutoscaler, meta)
resp, _, err := a.scaler.Trigger(jobID, scaleReq, state.SourceInternalAutoscaler)
if err != nil {
jobLogger.Error().Err(err).Msg("failed to trigger scaling of job")
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/scale/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// Scale is the interface used for scaling a Nomad job.
type Scale interface {
// Trigger performs scaling of 1 or more job groups which belong to the same job.
Trigger(string, []*GroupReq, state.Source, map[string]string) (*ScalingResponse, int, error)
Trigger(string, []*GroupReq, state.Source) (*ScalingResponse, int, error)

// GetDeploymentChannel is used to return the channel where updates to Nomad deployments should
// be sent.
Expand Down Expand Up @@ -57,6 +57,10 @@ type GroupReq struct {
// triggered. This is to help coordinate with checks such as cooldown and ensure a single time
// can be used.
Time int64

// Meta is the meta data which is optionally submitted when requesting a scaling activity for a
// job group. This is free-form and can contain any information the user deems relevant.
Meta map[string]string
}

type ScalingResponse struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scale/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/jrasell/sherpa/pkg/state"
)

func (s *Scaler) sendScalingEventToState(job, id string, source state.Source, groupReqs []*GroupReq, err error, meta map[string]string) uuid.UUID {
func (s *Scaler) sendScalingEventToState(job, id string, source state.Source, groupReqs []*GroupReq, err error) uuid.UUID {

status := s.generateEventStatus(err)

Expand All @@ -24,7 +24,7 @@ func (s *Scaler) sendScalingEventToState(job, id string, source state.Source, gr
Time: groupReqs[i].Time,
Count: groupReqs[i].Count,
Direction: groupReqs[i].Direction.String(),
Meta: meta,
Meta: groupReqs[i].Meta,
}

if err := s.state.PutScalingEvent(job, &event); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/scale/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewScaler(c *api.Client, l zerolog.Logger, state scale.Backend, strictCheck
// - the Nomad API job register response
// - the HTTP return code, used for the Sherpa API
// - any error
func (s *Scaler) Trigger(jobID string, groupReqs []*GroupReq, source state.Source, meta map[string]string) (*ScalingResponse, int, error) {
func (s *Scaler) Trigger(jobID string, groupReqs []*GroupReq, source state.Source) (*ScalingResponse, int, error) {

// In order to submit a job for scaling we need to read the entire job back to Nomad as it does
// not currently have convenience methods for changing job group counts.
Expand Down Expand Up @@ -80,19 +80,19 @@ func (s *Scaler) Trigger(jobID string, groupReqs []*GroupReq, source state.Sourc

resp, err := s.triggerNomadRegister(job)

return s.handleEndState(jobID, resp, err, groupReqs, source, meta)
return s.handleEndState(jobID, resp, err, groupReqs, source)
}

func (s *Scaler) handleEndState(job string, apiResp *api.JobRegisterResponse, apiErr error, groupReqs []*GroupReq,
source state.Source, meta map[string]string) (*ScalingResponse, int, error) {
source state.Source) (*ScalingResponse, int, error) {

eval := ""

if apiResp != nil {
eval = apiResp.EvalID
}

scaleID := s.sendScalingEventToState(job, eval, source, groupReqs, apiErr, meta)
scaleID := s.sendScalingEventToState(job, eval, source, groupReqs, apiErr)

if apiErr != nil {
return nil, http.StatusInternalServerError, apiErr
Expand Down
18 changes: 14 additions & 4 deletions pkg/scale/v1/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ func (s *Scale) InJobGroup(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
newReq := &scale.GroupReq{Direction: scale.DirectionIn, GroupName: groupID, Time: helper.GenerateEventTimestamp()}
newReq := &scale.GroupReq{
Direction: scale.DirectionIn,
GroupName: groupID,
Time: helper.GenerateEventTimestamp(),
Meta: body.Meta,
}

if s.scaler.JobGroupIsDeploying(jobID, groupID) {
s.logger.Info().
Expand Down Expand Up @@ -114,7 +119,7 @@ func (s *Scale) InJobGroup(w http.ResponseWriter, r *http.Request) {
return
}

scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI, body.Meta)
scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI)
if err != nil {
s.logger.Error().
Err(err).
Expand Down Expand Up @@ -160,7 +165,12 @@ func (s *Scale) OutJobGroup(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
newReq := &scale.GroupReq{Direction: scale.DirectionOut, GroupName: groupID, Time: helper.GenerateEventTimestamp()}
newReq := &scale.GroupReq{
Direction: scale.DirectionOut,
GroupName: groupID,
Time: helper.GenerateEventTimestamp(),
Meta: body.Meta,
}

if s.scaler.JobGroupIsDeploying(jobID, groupID) {
s.logger.Info().
Expand Down Expand Up @@ -220,7 +230,7 @@ func (s *Scale) OutJobGroup(w http.ResponseWriter, r *http.Request) {
return
}

scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI, body.Meta)
scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI)
if err != nil {
s.logger.Error().
Err(err).
Expand Down

0 comments on commit 5d8737f

Please sign in to comment.