diff --git a/CHANGELOG.md b/CHANGELOG.md
index 329ce49..c2816c9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,12 @@
+## 0.4.0 (Unreleased)
+
+IMPROVEMENTS:
+ * Metrics to measure policy backend request latencies [[GH-93]](https://github.com/jrasell/sherpa/pull/93)
+ * Metrics to measure scaling state backend request latencies[[GH-94]](https://github.com/jrasell/sherpa/pull/94)
+
+REFACTOR:
+ * Move the system API endpoint into the `server/endpoints/v1` package [[GH-99]](https://github.com/jrasell/sherpa/pull/99)
+
## 0.3.0 (4 November, 2019)
IMPROVEMENTS:
diff --git a/docs/configuration/telemetry.md b/docs/configuration/telemetry.md
index 976b851..a247d34 100644
--- a/docs/configuration/telemetry.md
+++ b/docs/configuration/telemetry.md
@@ -77,6 +77,189 @@ Runtime metrics allow operators to get insight into how the Sherpa server proces
+# Policy Backend Metrics
+
+Policy backend metrics allow operators to get insight into how the policy storage backend is functioning.
+
+
+
+ Metric |
+ Description |
+ Unit |
+ Type |
+
+
+ `sherpa.policy.memory.get_policies` |
+ Time taken to list all stored scaling policies from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.memory.get_job_policy` |
+ Time taken to get a job scaling policy from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.memory.get_job_group_policy` |
+ Time taken to get a job group scaling policy from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.memory.put_job_policy` |
+ Time taken to put a job scaling policy in the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.memory.put_job_group_policy` |
+ Time taken to put a job group scaling policy in the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.memory.delete_job_policy` |
+ Time taken to delete a job scaling policy from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.memory.delete_job_group_policy` |
+ Time taken to delete a job group scaling policy from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.consul.get_policies` |
+ Time taken to list all stored scaling policies from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.consul.get_job_policy` |
+ Time taken to get a job scaling policy from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.consul.get_job_group_policy` |
+ Time taken to get a job group scaling policy from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.consul.put_job_policy` |
+ Time taken to put a job scaling policy in the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.consul.put_job_group_policy` |
+ Time taken to put a job group scaling policy in the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.consul.delete_job_policy` |
+ Time taken to delete a job scaling policy from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.policy.consul.delete_job_group_policy` |
+ Time taken to delete a job group scaling policy from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+
+
+# Scaling State Backend Metrics
+
+Scaling state backend metrics allow operators to get insight into how the scaling state backend is functioning.
+
+
+
+ Metric |
+ Description |
+ Unit |
+ Type |
+
+
+ `sherpa.scale.state.memory.get_events` |
+ Time taken to list all stored scaling activities from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.memory.get_event` |
+ Time taken to get a stored scaling activity from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.memory.get_latest_events` |
+ Time taken to list the latest stored scaling activities from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.memory.get_latest_event` |
+ Time taken to get the latest scaling activity for a job group from the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.memory.put_event` |
+ Time taken to put a scaling activity in the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.memory.gc` |
+ Time taken to run the scaling state garbage collector for the memory backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.consul.get_events` |
+ Time taken to list all stored scaling activities from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.consul.get_event` |
+ Time taken to get a stored scaling activity from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.consul.get_latest_events` |
+ Time taken to list the latest stored scaling activities from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.consul.get_latest_event` |
+ Time taken to get the latest scaling activity for a job group from the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.consul.put_event` |
+ Time taken to put a scaling activity in the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+ `sherpa.scale.state.consul.gc` |
+ Time taken to run the scaling state garbage collector for the Consul backend |
+ Milliseconds |
+ Summary |
+
+
+
# Autoscale Metrics
Autoscale metrics allow operators to get insight into how the autoscaler is functioning.
diff --git a/pkg/policy/backend/consul/consul.go b/pkg/policy/backend/consul/consul.go
index 10eb625..262adff 100644
--- a/pkg/policy/backend/consul/consul.go
+++ b/pkg/policy/backend/consul/consul.go
@@ -3,7 +3,9 @@ package consul
import (
"encoding/json"
"strings"
+ "time"
+ "github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/jrasell/sherpa/pkg/client"
"github.com/jrasell/sherpa/pkg/policy"
@@ -18,6 +20,17 @@ const (
baseKVPath = "policies/"
)
+// Define our metric keys.
+var (
+ metricKeyGetPolicies = []string{"policy", "consul", "get_policies"}
+ metricKeyGetJobPolicy = []string{"policy", "consul", "get_job_policy"}
+ metricKeyGetJobGroupPolicy = []string{"policy", "consul", "get_job_group_policy"}
+ metricKeyPutJobPolicy = []string{"policy", "consul", "put_job_policy"}
+ metricKeyPutJobGroupPolicy = []string{"policy", "consul", "put_job_group_policy"}
+ metricKeyDeleteJobPolicy = []string{"policy", "consul", "delete_job_policy"}
+ metricKeyDeleteJobGroupPolicy = []string{"policy", "consul", "delete_job_group_policy"}
+)
+
type PolicyBackend struct {
path string
logger zerolog.Logger
@@ -36,6 +49,8 @@ func NewConsulPolicyBackend(log zerolog.Logger, path string) backend.PolicyBacke
}
func (p *PolicyBackend) GetPolicies() (map[string]map[string]*policy.GroupScalingPolicy, error) {
+ defer metrics.MeasureSince(metricKeyGetPolicies, time.Now())
+
kv, _, err := p.kv.List(p.path, nil)
if err != nil {
return nil, err
@@ -69,6 +84,8 @@ func (p *PolicyBackend) GetPolicies() (map[string]map[string]*policy.GroupScalin
}
func (p *PolicyBackend) GetJobPolicy(job string) (map[string]*policy.GroupScalingPolicy, error) {
+ defer metrics.MeasureSince(metricKeyGetJobPolicy, time.Now())
+
kv, _, err := p.kv.List(p.path+job, nil)
if err != nil {
return nil, err
@@ -96,6 +113,8 @@ func (p *PolicyBackend) GetJobPolicy(job string) (map[string]*policy.GroupScalin
}
func (p *PolicyBackend) GetJobGroupPolicy(job, group string) (*policy.GroupScalingPolicy, error) {
+ defer metrics.MeasureSince(metricKeyGetJobGroupPolicy, time.Now())
+
kv, _, err := p.kv.Get(p.path+job+"/"+group, nil)
if err != nil {
return nil, err
@@ -115,6 +134,8 @@ func (p *PolicyBackend) GetJobGroupPolicy(job, group string) (*policy.GroupScali
}
func (p *PolicyBackend) PutJobPolicy(job string, groupPolicies map[string]*policy.GroupScalingPolicy) error {
+ defer metrics.MeasureSince(metricKeyPutJobPolicy, time.Now())
+
var kvOpts []*api.KVTxnOp // nolint:prealloc
for group, pol := range groupPolicies {
@@ -145,6 +166,8 @@ func (p *PolicyBackend) PutJobPolicy(job string, groupPolicies map[string]*polic
}
func (p *PolicyBackend) PutJobGroupPolicy(job, group string, pol *policy.GroupScalingPolicy) error {
+ defer metrics.MeasureSince(metricKeyPutJobGroupPolicy, time.Now())
+
marshal, err := json.Marshal(pol)
if err != nil {
return err
@@ -160,11 +183,15 @@ func (p *PolicyBackend) PutJobGroupPolicy(job, group string, pol *policy.GroupSc
}
func (p *PolicyBackend) DeleteJobPolicy(job string) error {
+ defer metrics.MeasureSince(metricKeyDeleteJobPolicy, time.Now())
+
_, err := p.kv.DeleteTree(p.path+job, nil)
return err
}
func (p *PolicyBackend) DeleteJobGroupPolicy(job, group string) error {
+ defer metrics.MeasureSince(metricKeyDeleteJobGroupPolicy, time.Now())
+
_, err := p.kv.Delete(p.path+job+"/"+group, nil)
return err
}
diff --git a/pkg/policy/backend/memory/memory.go b/pkg/policy/backend/memory/memory.go
index 815bba3..76e2eab 100644
--- a/pkg/policy/backend/memory/memory.go
+++ b/pkg/policy/backend/memory/memory.go
@@ -2,13 +2,26 @@ package memory
import (
"sync"
+ "time"
+ "github.com/armon/go-metrics"
"github.com/jrasell/sherpa/pkg/policy"
"github.com/jrasell/sherpa/pkg/policy/backend"
)
var _ backend.PolicyBackend = (*PolicyBackend)(nil)
+// Define our metric keys.
+var (
+ metricKeyGetPolicies = []string{"policy", "memory", "get_policies"}
+ metricKeyGetJobPolicy = []string{"policy", "memory", "get_job_policy"}
+ metricKeyGetJobGroupPolicy = []string{"policy", "memory", "get_job_group_policy"}
+ metricKeyPutJobPolicy = []string{"policy", "memory", "put_job_policy"}
+ metricKeyPutJobGroupPolicy = []string{"policy", "memory", "put_job_group_policy"}
+ metricKeyDeleteJobPolicy = []string{"policy", "memory", "delete_job_policy"}
+ metricKeyDeleteJobGroupPolicy = []string{"policy", "memory", "delete_job_group_policy"}
+)
+
type PolicyBackend struct {
policies map[string]map[string]*policy.GroupScalingPolicy
sync.RWMutex
@@ -21,6 +34,8 @@ func NewJobScalingPolicies() backend.PolicyBackend {
}
func (p *PolicyBackend) GetPolicies() (map[string]map[string]*policy.GroupScalingPolicy, error) {
+ defer metrics.MeasureSince(metricKeyGetPolicies, time.Now())
+
p.RLock()
val := p.policies
p.RUnlock()
@@ -28,6 +43,8 @@ func (p *PolicyBackend) GetPolicies() (map[string]map[string]*policy.GroupScalin
}
func (p *PolicyBackend) GetJobPolicy(job string) (map[string]*policy.GroupScalingPolicy, error) {
+ defer metrics.MeasureSince(metricKeyGetJobPolicy, time.Now())
+
p.RLock()
defer p.RUnlock()
@@ -38,6 +55,8 @@ func (p *PolicyBackend) GetJobPolicy(job string) (map[string]*policy.GroupScalin
}
func (p *PolicyBackend) GetJobGroupPolicy(job, group string) (*policy.GroupScalingPolicy, error) {
+ defer metrics.MeasureSince(metricKeyGetJobGroupPolicy, time.Now())
+
p.RLock()
defer p.RUnlock()
@@ -48,6 +67,8 @@ func (p *PolicyBackend) GetJobGroupPolicy(job, group string) (*policy.GroupScali
}
func (p *PolicyBackend) PutJobPolicy(job string, policies map[string]*policy.GroupScalingPolicy) error {
+ defer metrics.MeasureSince(metricKeyPutJobPolicy, time.Now())
+
p.Lock()
defer p.Unlock()
@@ -62,6 +83,8 @@ func (p *PolicyBackend) PutJobPolicy(job string, policies map[string]*policy.Gro
}
func (p *PolicyBackend) PutJobGroupPolicy(job, group string, policies *policy.GroupScalingPolicy) error {
+ defer metrics.MeasureSince(metricKeyPutJobGroupPolicy, time.Now())
+
p.Lock()
defer p.Unlock()
@@ -76,6 +99,8 @@ func (p *PolicyBackend) PutJobGroupPolicy(job, group string, policies *policy.Gr
}
func (p *PolicyBackend) DeleteJobGroupPolicy(job, group string) error {
+ defer metrics.MeasureSince(metricKeyDeleteJobPolicy, time.Now())
+
p.Lock()
defer p.Unlock()
@@ -86,6 +111,8 @@ func (p *PolicyBackend) DeleteJobGroupPolicy(job, group string) error {
}
func (p *PolicyBackend) DeleteJobPolicy(job string) error {
+ defer metrics.MeasureSince(metricKeyDeleteJobGroupPolicy, time.Now())
+
p.Lock()
defer p.Unlock()
diff --git a/pkg/system/v1/system.go b/pkg/server/endpoints/v1/system.go
similarity index 73%
rename from pkg/system/v1/system.go
rename to pkg/server/endpoints/v1/system.go
index b1a5090..c67049d 100644
--- a/pkg/system/v1/system.go
+++ b/pkg/server/endpoints/v1/system.go
@@ -25,7 +25,7 @@ const (
defaultStorageBackendConsul = "Consul"
)
-type System struct {
+type SystemServer struct {
logger zerolog.Logger
member *cluster.Member
nomad *api.Client
@@ -55,8 +55,8 @@ type SystemLeaderResp struct {
LeaderClusterAddress string
}
-func NewSystemServer(l zerolog.Logger, nomad *api.Client, server *serverCfg.Config, tel *metrics.InmemSink, mem *cluster.Member) *System {
- return &System{
+func NewSystemServer(l zerolog.Logger, nomad *api.Client, server *serverCfg.Config, tel *metrics.InmemSink, mem *cluster.Member) *SystemServer {
+ return &SystemServer{
logger: l,
member: mem,
nomad: nomad,
@@ -65,34 +65,34 @@ func NewSystemServer(l zerolog.Logger, nomad *api.Client, server *serverCfg.Conf
}
}
-func (h *System) GetHealth(w http.ResponseWriter, r *http.Request) {
+func (s *SystemServer) GetHealth(w http.ResponseWriter, r *http.Request) {
writeJSONResponse(w, []byte(defaultHealthResp))
}
-func (h *System) GetInfo(w http.ResponseWriter, r *http.Request) {
+func (s *SystemServer) GetInfo(w http.ResponseWriter, r *http.Request) {
resp := &SystemInfoResp{
- NomadAddress: h.nomad.Address(),
- StrictPolicyChecking: h.server.StrictPolicyChecking,
- InternalAutoScalingEngine: h.server.InternalAutoScaler,
+ NomadAddress: s.nomad.Address(),
+ StrictPolicyChecking: s.server.StrictPolicyChecking,
+ InternalAutoScalingEngine: s.server.InternalAutoScaler,
PolicyEngine: defaultDisabledPolicyResp,
StorageBackend: defaultStorageBackend,
}
- if h.server.ConsulStorageBackend {
+ if s.server.ConsulStorageBackend {
resp.StorageBackend = defaultStorageBackendConsul
}
- if h.server.APIPolicyEngine {
+ if s.server.APIPolicyEngine {
resp.PolicyEngine = defaultAPIPolicyResp
}
- if h.server.NomadMetaPolicyEngine {
+ if s.server.NomadMetaPolicyEngine {
resp.PolicyEngine = defaultMetaPolicyResp
}
out, err := json.Marshal(resp)
if err != nil {
- h.logger.Error().Err(err).Msg("failed to marshal HTTP response")
+ s.logger.Error().Err(err).Msg("failed to marshal HTTP response")
http.Error(w, "", http.StatusInternalServerError)
return
}
@@ -100,26 +100,26 @@ func (h *System) GetInfo(w http.ResponseWriter, r *http.Request) {
writeJSONResponse(w, out)
}
-func (h *System) GetLeader(w http.ResponseWriter, r *http.Request) {
+func (s *SystemServer) GetLeader(w http.ResponseWriter, r *http.Request) {
// Pull the leadership information from the local member.
- l, addr, advAddr, err := h.member.Leader()
+ l, addr, advAddr, err := s.member.Leader()
if err != nil {
- h.logger.Error().Err(err).Msg("failed to get leadership information")
+ s.logger.Error().Err(err).Msg("failed to get leadership information")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := SystemLeaderResp{
IsSelf: l,
- HAEnabled: h.member.IsHA(),
+ HAEnabled: s.member.IsHA(),
LeaderAddress: addr,
LeaderClusterAddress: advAddr,
}
out, err := json.Marshal(resp)
if err != nil {
- h.logger.Error().Err(err).Msg("failed to marshal HTTP response")
+ s.logger.Error().Err(err).Msg("failed to marshal HTTP response")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -127,17 +127,17 @@ func (h *System) GetLeader(w http.ResponseWriter, r *http.Request) {
writeJSONResponse(w, out)
}
-func (h *System) GetMetrics(w http.ResponseWriter, r *http.Request) {
- metricData, err := h.telemetry.DisplayMetrics(w, r)
+func (s *SystemServer) GetMetrics(w http.ResponseWriter, r *http.Request) {
+ metricData, err := s.telemetry.DisplayMetrics(w, r)
if err != nil {
- h.logger.Error().Err(err).Msg("failed to get latest telemetry data")
+ s.logger.Error().Err(err).Msg("failed to get latest telemetry data")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
out, err := json.Marshal(metricData)
if err != nil {
- h.logger.Error().Err(err).Msg("failed to marshal HTTP response")
+ s.logger.Error().Err(err).Msg("failed to marshal HTTP response")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
diff --git a/pkg/system/v1/system_test.go b/pkg/server/endpoints/v1/system_test.go
similarity index 100%
rename from pkg/system/v1/system_test.go
rename to pkg/server/endpoints/v1/system_test.go
diff --git a/pkg/server/routes.go b/pkg/server/routes.go
index d7a83ac..f4dbb6d 100644
--- a/pkg/server/routes.go
+++ b/pkg/server/routes.go
@@ -7,11 +7,10 @@ import (
scaleV1 "github.com/jrasell/sherpa/pkg/scale/v1"
v1 "github.com/jrasell/sherpa/pkg/server/endpoints/v1"
"github.com/jrasell/sherpa/pkg/server/router"
- systemV1 "github.com/jrasell/sherpa/pkg/system/v1"
)
type routes struct {
- System *systemV1.System
+ System *v1.SystemServer
Policy *policyV1.Policy
Scale *scaleV1.Scale
UI *v1.UIServer
@@ -112,7 +111,7 @@ func (h *HTTPServer) setupScaleRoutes() []router.Route {
func (h *HTTPServer) setupSystemRoutes() []router.Route {
h.logger.Debug().Msg("setting up server system routes")
- h.routes.System = systemV1.NewSystemServer(h.logger, h.nomad, h.cfg.Server, h.telemetry, h.clusterMember)
+ h.routes.System = v1.NewSystemServer(h.logger, h.nomad, h.cfg.Server, h.telemetry, h.clusterMember)
return router.Routes{
router.Route{
diff --git a/pkg/state/scale/consul/consul.go b/pkg/state/scale/consul/consul.go
index c29327f..210e0ff 100644
--- a/pkg/state/scale/consul/consul.go
+++ b/pkg/state/scale/consul/consul.go
@@ -6,6 +6,7 @@ import (
"strings"
"time"
+ "github.com/armon/go-metrics"
"github.com/gofrs/uuid"
"github.com/hashicorp/consul/api"
"github.com/jrasell/sherpa/pkg/client"
@@ -23,6 +24,16 @@ const (
latestEventsKVPath = "state/latest-events/"
)
+// Define our metric keys.
+var (
+ metricKeyGetEvents = []string{"scale", "state", "consul", "get_events"}
+ metricKeyGetEvent = []string{"scale", "state", "consul", "get_event"}
+ metricKeyGetLatestEvents = []string{"scale", "state", "consul", "get_latest_events"}
+ metricKeyGetLatestEvent = []string{"scale", "state", "consul", "get_latest_event"}
+ metricKeyPutEvent = []string{"scale", "state", "consul", "put_event"}
+ metricKeyGC = []string{"scale", "state", "consul", "gc"}
+)
+
type StateBackend struct {
basePath string
eventsPath string
@@ -47,6 +58,8 @@ func NewStateBackend(log zerolog.Logger, path string) scale.Backend {
}
func (s StateBackend) GetLatestScalingEvents() (map[string]*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetLatestEvents, time.Now())
+
kv, _, err := s.kv.List(s.latestEventsPath, nil)
if err != nil {
return nil, err
@@ -73,6 +86,8 @@ func (s StateBackend) GetLatestScalingEvents() (map[string]*state.ScalingEvent,
}
func (s StateBackend) GetLatestScalingEvent(job, group string) (*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetLatestEvent, time.Now())
+
kv, _, err := s.kv.Get(s.latestEventsPath+job+":"+group, nil)
if err != nil {
return nil, err
@@ -90,6 +105,8 @@ func (s StateBackend) GetLatestScalingEvent(job, group string) (*state.ScalingEv
}
func (s StateBackend) GetScalingEvents() (map[uuid.UUID]map[string]*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetEvents, time.Now())
+
kv, _, err := s.kv.List(s.eventsPath, nil)
if err != nil {
return nil, err
@@ -122,6 +139,8 @@ func (s StateBackend) GetScalingEvents() (map[uuid.UUID]map[string]*state.Scalin
}
func (s StateBackend) GetScalingEvent(id uuid.UUID) (map[string]*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetEvent, time.Now())
+
kv, _, err := s.kv.List(s.eventsPath+id.String(), nil)
if err != nil {
return nil, err
@@ -147,6 +166,7 @@ func (s StateBackend) GetScalingEvent(id uuid.UUID) (map[string]*state.ScalingEv
}
func (s StateBackend) PutScalingEvent(job string, event *state.ScalingEventMessage) error {
+ defer metrics.MeasureSince(metricKeyPutEvent, time.Now())
sEntry := &state.ScalingEvent{
EvalID: event.EvalID,
@@ -181,6 +201,9 @@ func (s StateBackend) PutScalingEvent(job string, event *state.ScalingEventMessa
}
func (s StateBackend) RunGarbageCollection() {
+ t := time.Now()
+ defer metrics.MeasureSince(metricKeyGC, t)
+
kv, _, err := s.kv.List(s.eventsPath, nil)
if err != nil {
s.logger.Error().Err(err).Msg("GC failed to list events in backend store")
@@ -190,7 +213,7 @@ func (s StateBackend) RunGarbageCollection() {
return
}
- gc := time.Now().UTC().UnixNano() - s.gcThreshold
+ gc := t.UTC().UnixNano() - s.gcThreshold
for i := range kv {
diff --git a/pkg/state/scale/memory/memory.go b/pkg/state/scale/memory/memory.go
index a36be3e..270b9b7 100644
--- a/pkg/state/scale/memory/memory.go
+++ b/pkg/state/scale/memory/memory.go
@@ -4,6 +4,7 @@ import (
"sync"
"time"
+ "github.com/armon/go-metrics"
"github.com/gofrs/uuid"
"github.com/jrasell/sherpa/pkg/state"
"github.com/jrasell/sherpa/pkg/state/scale"
@@ -11,6 +12,16 @@ import (
var _ scale.Backend = (*StateBackend)(nil)
+// Define our metric keys.
+var (
+ metricKeyGetEvents = []string{"scale", "state", "memory", "get_events"}
+ metricKeyGetEvent = []string{"scale", "state", "memory", "get_event"}
+ metricKeyGetLatestEvents = []string{"scale", "state", "memory", "get_latest_events"}
+ metricKeyGetLatestEvent = []string{"scale", "state", "memory", "get_latest_event"}
+ metricKeyPutEvent = []string{"scale", "state", "memory", "put_event"}
+ metricKeyGC = []string{"scale", "state", "memory", "gc"}
+)
+
type StateBackend struct {
gcThreshold int64
state *state.ScalingState
@@ -28,6 +39,8 @@ func NewStateBackend() scale.Backend {
}
func (s *StateBackend) GetLatestScalingEvents() (map[string]*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetLatestEvents, time.Now())
+
s.RLock()
latest := s.state.LatestEvents
s.RUnlock()
@@ -35,6 +48,8 @@ func (s *StateBackend) GetLatestScalingEvents() (map[string]*state.ScalingEvent,
}
func (s *StateBackend) GetLatestScalingEvent(job, group string) (*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetLatestEvent, time.Now())
+
s.RLock()
latest := s.state.LatestEvents[job+":"+group]
s.RUnlock()
@@ -42,6 +57,8 @@ func (s *StateBackend) GetLatestScalingEvent(job, group string) (*state.ScalingE
}
func (s *StateBackend) GetScalingEvents() (map[uuid.UUID]map[string]*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetEvents, time.Now())
+
s.RLock()
events := s.state.Events
s.RUnlock()
@@ -49,6 +66,8 @@ func (s *StateBackend) GetScalingEvents() (map[uuid.UUID]map[string]*state.Scali
}
func (s *StateBackend) PutScalingEvent(job string, event *state.ScalingEventMessage) error {
+ defer metrics.MeasureSince(metricKeyPutEvent, time.Now())
+
s.Lock()
defer s.Unlock()
@@ -71,6 +90,8 @@ func (s *StateBackend) PutScalingEvent(job string, event *state.ScalingEventMess
}
func (s *StateBackend) GetScalingEvent(id uuid.UUID) (map[string]*state.ScalingEvent, error) {
+ defer metrics.MeasureSince(metricKeyGetEvent, time.Now())
+
s.RLock()
e := s.state.Events[id]
s.RUnlock()
@@ -78,8 +99,10 @@ func (s *StateBackend) GetScalingEvent(id uuid.UUID) (map[string]*state.ScalingE
}
func (s *StateBackend) RunGarbageCollection() {
+ t := time.Now()
+ defer metrics.MeasureSince(metricKeyGC, t)
- gc := time.Now().UTC().UnixNano() - s.gcThreshold
+ gc := t.UTC().UnixNano() - s.gcThreshold
newEventState := make(map[uuid.UUID]map[string]*state.ScalingEvent)