From 5d8737f8f9f8c5aa398cded023457b8bfd5ce22d Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Nov 2019 10:26:21 +0100 Subject: [PATCH] scale-meta: meta should be unique per job group. The autoscaler works by analysing all groups within a job then submitting the combined changes required to the scaler process to action. Therefore the metadata sent with a request should be part of the group request, not the function call allowing for metadata to be unique per group. --- pkg/autoscale/autoscale.go | 7 ++++--- pkg/scale/consts.go | 6 +++++- pkg/scale/event.go | 4 ++-- pkg/scale/scale.go | 8 ++++---- pkg/scale/v1/scale.go | 18 ++++++++++++++---- 5 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pkg/autoscale/autoscale.go b/pkg/autoscale/autoscale.go index 52ac331..7a13e04 100644 --- a/pkg/autoscale/autoscale.go +++ b/pkg/autoscale/autoscale.go @@ -36,7 +36,6 @@ func (a *AutoScale) autoscaleJob(jobID string, policies map[string]*policy.Group } var scaleReq []*scale.GroupReq // nolint:prealloc - meta := make(map[string]string) for group, pol := range policies { @@ -72,12 +71,14 @@ func (a *AutoScale) autoscaleJob(jobID string, policies map[string]*policy.Group groupLogger.Debug().Msg("no scaling required") continue } - groupLogger.Debug(). + groupLogger.Info(). Object("decision", decision). Msg("scaling decision made and action required") // Iterate over the resource metrics which have broken their thresholds and ensure these // are added to the submission meta. + meta := make(map[string]string) + for name, metric := range decision.metrics { updateAutoscaleMeta(group, name, metric.usage, metric.threshold, meta) } @@ -100,7 +101,7 @@ func (a *AutoScale) autoscaleJob(jobID string, policies map[string]*policy.Group // If group scaling requests have been added to the array for the job that is currently being // checked, trigger a scaling event. if len(scaleReq) > 0 { - resp, _, err := a.scaler.Trigger(jobID, scaleReq, state.SourceInternalAutoscaler, meta) + resp, _, err := a.scaler.Trigger(jobID, scaleReq, state.SourceInternalAutoscaler) if err != nil { jobLogger.Error().Err(err).Msg("failed to trigger scaling of job") } diff --git a/pkg/scale/consts.go b/pkg/scale/consts.go index 40d9ee2..8baa80d 100644 --- a/pkg/scale/consts.go +++ b/pkg/scale/consts.go @@ -10,7 +10,7 @@ import ( // Scale is the interface used for scaling a Nomad job. type Scale interface { // Trigger performs scaling of 1 or more job groups which belong to the same job. - Trigger(string, []*GroupReq, state.Source, map[string]string) (*ScalingResponse, int, error) + Trigger(string, []*GroupReq, state.Source) (*ScalingResponse, int, error) // GetDeploymentChannel is used to return the channel where updates to Nomad deployments should // be sent. @@ -57,6 +57,10 @@ type GroupReq struct { // triggered. This is to help coordinate with checks such as cooldown and ensure a single time // can be used. Time int64 + + // Meta is the meta data which is optionally submitted when requesting a scaling activity for a + // job group. This is free-form and can contain any information the user deems relevant. + Meta map[string]string } type ScalingResponse struct { diff --git a/pkg/scale/event.go b/pkg/scale/event.go index 7b78df1..18d8de6 100644 --- a/pkg/scale/event.go +++ b/pkg/scale/event.go @@ -5,7 +5,7 @@ import ( "github.com/jrasell/sherpa/pkg/state" ) -func (s *Scaler) sendScalingEventToState(job, id string, source state.Source, groupReqs []*GroupReq, err error, meta map[string]string) uuid.UUID { +func (s *Scaler) sendScalingEventToState(job, id string, source state.Source, groupReqs []*GroupReq, err error) uuid.UUID { status := s.generateEventStatus(err) @@ -24,7 +24,7 @@ func (s *Scaler) sendScalingEventToState(job, id string, source state.Source, gr Time: groupReqs[i].Time, Count: groupReqs[i].Count, Direction: groupReqs[i].Direction.String(), - Meta: meta, + Meta: groupReqs[i].Meta, } if err := s.state.PutScalingEvent(job, &event); err != nil { diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index e78bbcc..918d483 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -44,7 +44,7 @@ func NewScaler(c *api.Client, l zerolog.Logger, state scale.Backend, strictCheck // - the Nomad API job register response // - the HTTP return code, used for the Sherpa API // - any error -func (s *Scaler) Trigger(jobID string, groupReqs []*GroupReq, source state.Source, meta map[string]string) (*ScalingResponse, int, error) { +func (s *Scaler) Trigger(jobID string, groupReqs []*GroupReq, source state.Source) (*ScalingResponse, int, error) { // In order to submit a job for scaling we need to read the entire job back to Nomad as it does // not currently have convenience methods for changing job group counts. @@ -80,11 +80,11 @@ func (s *Scaler) Trigger(jobID string, groupReqs []*GroupReq, source state.Sourc resp, err := s.triggerNomadRegister(job) - return s.handleEndState(jobID, resp, err, groupReqs, source, meta) + return s.handleEndState(jobID, resp, err, groupReqs, source) } func (s *Scaler) handleEndState(job string, apiResp *api.JobRegisterResponse, apiErr error, groupReqs []*GroupReq, - source state.Source, meta map[string]string) (*ScalingResponse, int, error) { + source state.Source) (*ScalingResponse, int, error) { eval := "" @@ -92,7 +92,7 @@ func (s *Scaler) handleEndState(job string, apiResp *api.JobRegisterResponse, ap eval = apiResp.EvalID } - scaleID := s.sendScalingEventToState(job, eval, source, groupReqs, apiErr, meta) + scaleID := s.sendScalingEventToState(job, eval, source, groupReqs, apiErr) if apiErr != nil { return nil, http.StatusInternalServerError, apiErr diff --git a/pkg/scale/v1/scale.go b/pkg/scale/v1/scale.go index 3509238..e12776c 100644 --- a/pkg/scale/v1/scale.go +++ b/pkg/scale/v1/scale.go @@ -55,7 +55,12 @@ func (s *Scale) InJobGroup(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - newReq := &scale.GroupReq{Direction: scale.DirectionIn, GroupName: groupID, Time: helper.GenerateEventTimestamp()} + newReq := &scale.GroupReq{ + Direction: scale.DirectionIn, + GroupName: groupID, + Time: helper.GenerateEventTimestamp(), + Meta: body.Meta, + } if s.scaler.JobGroupIsDeploying(jobID, groupID) { s.logger.Info(). @@ -114,7 +119,7 @@ func (s *Scale) InJobGroup(w http.ResponseWriter, r *http.Request) { return } - scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI, body.Meta) + scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI) if err != nil { s.logger.Error(). Err(err). @@ -160,7 +165,12 @@ func (s *Scale) OutJobGroup(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) return } - newReq := &scale.GroupReq{Direction: scale.DirectionOut, GroupName: groupID, Time: helper.GenerateEventTimestamp()} + newReq := &scale.GroupReq{ + Direction: scale.DirectionOut, + GroupName: groupID, + Time: helper.GenerateEventTimestamp(), + Meta: body.Meta, + } if s.scaler.JobGroupIsDeploying(jobID, groupID) { s.logger.Info(). @@ -220,7 +230,7 @@ func (s *Scale) OutJobGroup(w http.ResponseWriter, r *http.Request) { return } - scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI, body.Meta) + scaleResp, respCode, err := s.scaler.Trigger(jobID, []*scale.GroupReq{newReq}, state.SourceAPI) if err != nil { s.logger.Error(). Err(err).