diff --git a/metrics/counter_float64.go b/metrics/counter_float64.go new file mode 100644 index 000000000000..f05f62fed632 --- /dev/null +++ b/metrics/counter_float64.go @@ -0,0 +1,153 @@ +package metrics + +import ( + "sync" +) + +// CounterFloat64 holds a float64 value that can be incremented and decremented. +type CounterFloat64 interface { + Clear() + Count() float64 + Dec(float64) + Inc(float64) + Snapshot() CounterFloat64 +} + +// GetOrRegisterCounterFloat64 returns an existing CounterFloat64 or constructs and registers +// a new StandardCounterFloat64. +func GetOrRegisterCounterFloat64(name string, r Registry) CounterFloat64 { + if nil == r { + r = DefaultRegistry + } + return r.GetOrRegister(name, NewCounterFloat64).(CounterFloat64) +} + +// GetOrRegisterCounterFloat64Forced returns an existing CounterFloat64 or constructs and registers a +// new CounterFloat64 no matter the global switch is enabled or not. +// Be sure to unregister the counter from the registry once it is of no use to +// allow for garbage collection. +func GetOrRegisterCounterFloat64Forced(name string, r Registry) CounterFloat64 { + if nil == r { + r = DefaultRegistry + } + return r.GetOrRegister(name, NewCounterFloat64Forced).(CounterFloat64) +} + +// NewCounterFloat64 constructs a new StandardCounterFloat64. +func NewCounterFloat64() CounterFloat64 { + if !Enabled { + return NilCounterFloat64{} + } + return &StandardCounterFloat64{count: 0.0} +} + +// NewCounterFloat64Forced constructs a new StandardCounterFloat64 and returns it no matter if +// the global switch is enabled or not. +func NewCounterFloat64Forced() CounterFloat64 { + return &StandardCounterFloat64{count: 0.0} +} + +// NewRegisteredCounterFloat64 constructs and registers a new StandardCounterFloat64. +func NewRegisteredCounterFloat64(name string, r Registry) CounterFloat64 { + c := NewCounterFloat64() + if nil == r { + r = DefaultRegistry + } + r.Register(name, c) + return c +} + +// NewRegisteredCounterFloat64Forced constructs and registers a new StandardCounterFloat64 +// and launches a goroutine no matter the global switch is enabled or not. +// Be sure to unregister the counter from the registry once it is of no use to +// allow for garbage collection. +func NewRegisteredCounterFloat64Forced(name string, r Registry) CounterFloat64 { + c := NewCounterFloat64Forced() + if nil == r { + r = DefaultRegistry + } + r.Register(name, c) + return c +} + +// CounterFloat64Snapshot is a read-only copy of another CounterFloat64. +type CounterFloat64Snapshot float64 + +// Clear panics. +func (CounterFloat64Snapshot) Clear() { + panic("Clear called on a CounterFloat64Snapshot") +} + +// Count returns the value at the time the snapshot was taken. +func (c CounterFloat64Snapshot) Count() float64 { return float64(c) } + +// Dec panics. +func (CounterFloat64Snapshot) Dec(float64) { + panic("Dec called on a CounterFloat64Snapshot") +} + +// Inc panics. +func (CounterFloat64Snapshot) Inc(float64) { + panic("Inc called on a CounterFloat64Snapshot") +} + +// Snapshot returns the snapshot. +func (c CounterFloat64Snapshot) Snapshot() CounterFloat64 { return c } + +// NilCounterFloat64 is a no-op CounterFloat64. +type NilCounterFloat64 struct{} + +// Clear is a no-op. +func (NilCounterFloat64) Clear() {} + +// Count is a no-op. +func (NilCounterFloat64) Count() float64 { return 0.0 } + +// Dec is a no-op. +func (NilCounterFloat64) Dec(i float64) {} + +// Inc is a no-op. +func (NilCounterFloat64) Inc(i float64) {} + +// Snapshot is a no-op. +func (NilCounterFloat64) Snapshot() CounterFloat64 { return NilCounterFloat64{} } + +// StandardCounterFloat64 is the standard implementation of a CounterFloat64 and uses the +// sync.Mutex package to manage a single float64 value. +type StandardCounterFloat64 struct { + mutex sync.Mutex + count float64 +} + +// Clear sets the counter to zero. +func (c *StandardCounterFloat64) Clear() { + c.mutex.Lock() + defer c.mutex.Unlock() + c.count = 0.0 +} + +// Count returns the current value. +func (c *StandardCounterFloat64) Count() float64 { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.count +} + +// Dec decrements the counter by the given amount. +func (c *StandardCounterFloat64) Dec(v float64) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.count -= v +} + +// Inc increments the counter by the given amount. +func (c *StandardCounterFloat64) Inc(v float64) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.count += v +} + +// Snapshot returns a read-only copy of the counter. +func (c *StandardCounterFloat64) Snapshot() CounterFloat64 { + return CounterFloat64Snapshot(c.Count()) +} diff --git a/metrics/counter_float_64_test.go b/metrics/counter_float_64_test.go new file mode 100644 index 000000000000..44d9b4c20c85 --- /dev/null +++ b/metrics/counter_float_64_test.go @@ -0,0 +1,77 @@ +package metrics + +import "testing" + +func BenchmarkCounterFloat64(b *testing.B) { + c := NewCounterFloat64() + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.Inc(1.0) + } +} + +func TestCounterFloat64Clear(t *testing.T) { + c := NewCounterFloat64() + c.Inc(1.0) + c.Clear() + if count := c.Count(); count != 0 { + t.Errorf("c.Count(): 0 != %v\n", count) + } +} + +func TestCounterFloat64Dec1(t *testing.T) { + c := NewCounterFloat64() + c.Dec(1.0) + if count := c.Count(); count != -1.0 { + t.Errorf("c.Count(): -1.0 != %v\n", count) + } +} + +func TestCounterFloat64Dec2(t *testing.T) { + c := NewCounterFloat64() + c.Dec(2.0) + if count := c.Count(); count != -2.0 { + t.Errorf("c.Count(): -2.0 != %v\n", count) + } +} + +func TestCounterFloat64Inc1(t *testing.T) { + c := NewCounterFloat64() + c.Inc(1.0) + if count := c.Count(); count != 1.0 { + t.Errorf("c.Count(): 1.0 != %v\n", count) + } +} + +func TestCounterFloat64Inc2(t *testing.T) { + c := NewCounterFloat64() + c.Inc(2.0) + if count := c.Count(); count != 2.0 { + t.Errorf("c.Count(): 2.0 != %v\n", count) + } +} + +func TestCounterFloat64Snapshot(t *testing.T) { + c := NewCounterFloat64() + c.Inc(1.0) + snapshot := c.Snapshot() + c.Inc(1.0) + if count := snapshot.Count(); count != 1.0 { + t.Errorf("c.Count(): 1.0 != %v\n", count) + } +} + +func TestCounterFloat64Zero(t *testing.T) { + c := NewCounterFloat64() + if count := c.Count(); count != 0 { + t.Errorf("c.Count(): 0 != %v\n", count) + } +} + +func TestGetOrRegisterCounterFloat64(t *testing.T) { + r := NewRegistry() + NewRegisteredCounterFloat64("foo", r).Inc(47.0) + if c := GetOrRegisterCounterFloat64("foo", r); c.Count() != 47.0 { + t.Fatal(c) + } +} diff --git a/metrics/exp/exp.go b/metrics/exp/exp.go index 3ebe8cc68aad..2b04eeab271f 100644 --- a/metrics/exp/exp.go +++ b/metrics/exp/exp.go @@ -100,6 +100,11 @@ func (exp *exp) publishCounter(name string, metric metrics.Counter) { v.Set(metric.Count()) } +func (exp *exp) publishCounterFloat64(name string, metric metrics.CounterFloat64) { + v := exp.getFloat(name) + v.Set(metric.Count()) +} + func (exp *exp) publishGauge(name string, metric metrics.Gauge) { v := exp.getInt(name) v.Set(metric.Value()) @@ -167,6 +172,8 @@ func (exp *exp) syncToExpvar() { switch i := i.(type) { case metrics.Counter: exp.publishCounter(name, i) + case metrics.CounterFloat64: + exp.publishCounterFloat64(name, i) case metrics.Gauge: exp.publishGauge(name, i) case metrics.GaugeFloat64: diff --git a/metrics/graphite.go b/metrics/graphite.go index 142eec86beb4..29f72b0c4181 100644 --- a/metrics/graphite.go +++ b/metrics/graphite.go @@ -67,6 +67,8 @@ func graphite(c *GraphiteConfig) error { switch metric := i.(type) { case Counter: fmt.Fprintf(w, "%s.%s.count %d %d\n", c.Prefix, name, metric.Count(), now) + case CounterFloat64: + fmt.Fprintf(w, "%s.%s.count %f %d\n", c.Prefix, name, metric.Count(), now) case Gauge: fmt.Fprintf(w, "%s.%s.value %d %d\n", c.Prefix, name, metric.Value(), now) case GaugeFloat64: diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 748c692e1310..ea6c3d9dff7f 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -141,6 +141,16 @@ func (r *reporter) send() error { }, Time: now, }) + case metrics.CounterFloat64: + count := metric.Count() + pts = append(pts, client.Point{ + Measurement: fmt.Sprintf("%s%s.count", namespace, name), + Tags: r.tags, + Fields: map[string]interface{}{ + "value": count, + }, + Time: now, + }) case metrics.Gauge: ms := metric.Snapshot() pts = append(pts, client.Point{ diff --git a/metrics/influxdb/influxdbv2.go b/metrics/influxdb/influxdbv2.go index bfb762196cb3..dc8a42cd5af6 100644 --- a/metrics/influxdb/influxdbv2.go +++ b/metrics/influxdb/influxdbv2.go @@ -24,8 +24,6 @@ type v2Reporter struct { client influxdb2.Client write api.WriteAPI - - cache map[string]int64 } // InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags @@ -39,7 +37,6 @@ func InfluxDBV2WithTags(r metrics.Registry, d time.Duration, endpoint string, to organization: organization, namespace: namespace, tags: tags, - cache: make(map[string]int64), } rep.client = influxdb2.NewClient(rep.endpoint, rep.token) @@ -86,17 +83,25 @@ func (r *v2Reporter) send() { switch metric := i.(type) { case metrics.Counter: v := metric.Count() - l := r.cache[name] measurement := fmt.Sprintf("%s%s.count", namespace, name) fields := map[string]interface{}{ - "value": v - l, + "value": v, } pt := influxdb2.NewPoint(measurement, r.tags, fields, now) r.write.WritePoint(pt) - r.cache[name] = v + case metrics.CounterFloat64: + v := metric.Count() + + measurement := fmt.Sprintf("%s%s.count", namespace, name) + fields := map[string]interface{}{ + "value": v, + } + + pt := influxdb2.NewPoint(measurement, r.tags, fields, now) + r.write.WritePoint(pt) case metrics.Gauge: ms := metric.Snapshot() diff --git a/metrics/librato/librato.go b/metrics/librato/librato.go index b16493413ee1..3d45f4c7be1b 100644 --- a/metrics/librato/librato.go +++ b/metrics/librato/librato.go @@ -107,6 +107,17 @@ func (rep *Reporter) BuildRequest(now time.Time, r metrics.Registry) (snapshot B } snapshot.Counters = append(snapshot.Counters, measurement) } + case metrics.CounterFloat64: + if m.Count() > 0 { + measurement[Name] = fmt.Sprintf("%s.%s", name, "count") + measurement[Value] = m.Count() + measurement[Attributes] = map[string]interface{}{ + DisplayUnitsLong: Operations, + DisplayUnitsShort: OperationsShort, + DisplayMin: "0", + } + snapshot.Counters = append(snapshot.Counters, measurement) + } case metrics.Gauge: measurement[Name] = name measurement[Value] = float64(m.Value()) diff --git a/metrics/log.go b/metrics/log.go index 0c8ea7c97123..d1ce627a8378 100644 --- a/metrics/log.go +++ b/metrics/log.go @@ -24,6 +24,9 @@ func LogScaled(r Registry, freq time.Duration, scale time.Duration, l Logger) { case Counter: l.Printf("counter %s\n", name) l.Printf(" count: %9d\n", metric.Count()) + case CounterFloat64: + l.Printf("counter %s\n", name) + l.Printf(" count: %f\n", metric.Count()) case Gauge: l.Printf("gauge %s\n", name) l.Printf(" value: %9d\n", metric.Value()) diff --git a/metrics/metrics.go b/metrics/metrics.go index ff7196b56494..c206f1692407 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -144,6 +144,9 @@ func CollectProcessMetrics(refresh time.Duration) { cpuSysLoad = GetOrRegisterGauge("system/cpu/sysload", DefaultRegistry) cpuSysWait = GetOrRegisterGauge("system/cpu/syswait", DefaultRegistry) cpuProcLoad = GetOrRegisterGauge("system/cpu/procload", DefaultRegistry) + cpuSysLoadTotal = GetOrRegisterCounterFloat64("system/cpu/sysload/total", DefaultRegistry) + cpuSysWaitTotal = GetOrRegisterCounterFloat64("system/cpu/syswait/total", DefaultRegistry) + cpuProcLoadTotal = GetOrRegisterCounterFloat64("system/cpu/procload/total", DefaultRegistry) cpuThreads = GetOrRegisterGauge("system/cpu/threads", DefaultRegistry) cpuGoroutines = GetOrRegisterGauge("system/cpu/goroutines", DefaultRegistry) cpuSchedLatency = getOrRegisterRuntimeHistogram("system/cpu/schedlatency", secondsToNs, nil) @@ -172,13 +175,17 @@ func CollectProcessMetrics(refresh time.Duration) { secondsSinceLastCollect := collectTime.Sub(lastCollectTime).Seconds() lastCollectTime = collectTime if secondsSinceLastCollect > 0 { - sysLoad := (cpustats[now].GlobalTime - cpustats[prev].GlobalTime) / secondsSinceLastCollect - sysWait := (cpustats[now].GlobalWait - cpustats[prev].GlobalWait) / secondsSinceLastCollect - procLoad := (cpustats[now].LocalTime - cpustats[prev].LocalTime) / secondsSinceLastCollect + sysLoad := cpustats[now].GlobalTime - cpustats[prev].GlobalTime + sysWait := cpustats[now].GlobalWait - cpustats[prev].GlobalWait + procLoad := cpustats[now].LocalTime - cpustats[prev].LocalTime // Convert to integer percentage. - cpuSysLoad.Update(int64(sysLoad * 100)) - cpuSysWait.Update(int64(sysWait * 100)) - cpuProcLoad.Update(int64(procLoad * 100)) + cpuSysLoad.Update(int64(sysLoad / secondsSinceLastCollect * 100)) + cpuSysWait.Update(int64(sysWait / secondsSinceLastCollect * 100)) + cpuProcLoad.Update(int64(procLoad / secondsSinceLastCollect * 100)) + // increment counters (ms) + cpuSysLoadTotal.Inc(sysLoad) + cpuSysWaitTotal.Inc(sysWait) + cpuProcLoadTotal.Inc(procLoad) } // Threads diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index e3fde1ea62ce..534c44139b36 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -18,6 +18,7 @@ func TestReadRuntimeValues(t *testing.T) { func BenchmarkMetrics(b *testing.B) { r := NewRegistry() c := NewRegisteredCounter("counter", r) + cf := NewRegisteredCounterFloat64("counterfloat64", r) g := NewRegisteredGauge("gauge", r) gf := NewRegisteredGaugeFloat64("gaugefloat64", r) h := NewRegisteredHistogram("histogram", r, NewUniformSample(100)) @@ -71,6 +72,7 @@ func BenchmarkMetrics(b *testing.B) { //log.Println("go", i) for i := 0; i < b.N; i++ { c.Inc(1) + cf.Inc(1.0) g.Update(int64(i)) gf.Update(float64(i)) h.Update(int64(i)) diff --git a/metrics/opentsdb.go b/metrics/opentsdb.go index 3fde55454ba9..c9fd2e75d5e5 100644 --- a/metrics/opentsdb.go +++ b/metrics/opentsdb.go @@ -71,6 +71,8 @@ func openTSDB(c *OpenTSDBConfig) error { switch metric := i.(type) { case Counter: fmt.Fprintf(w, "put %s.%s.count %d %d host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname) + case CounterFloat64: + fmt.Fprintf(w, "put %s.%s.count %d %f host=%s\n", c.Prefix, name, now, metric.Count(), shortHostname) case Gauge: fmt.Fprintf(w, "put %s.%s.value %d %d host=%s\n", c.Prefix, name, now, metric.Value(), shortHostname) case GaugeFloat64: diff --git a/metrics/prometheus/collector.go b/metrics/prometheus/collector.go index e8d5e4f5d1ea..2bd9bf22ccae 100644 --- a/metrics/prometheus/collector.go +++ b/metrics/prometheus/collector.go @@ -50,6 +50,10 @@ func (c *collector) addCounter(name string, m metrics.Counter) { c.writeGaugeCounter(name, m.Count()) } +func (c *collector) addCounterFloat64(name string, m metrics.CounterFloat64) { + c.writeGaugeCounter(name, m.Count()) +} + func (c *collector) addGauge(name string, m metrics.Gauge) { c.writeGaugeCounter(name, m.Value()) } diff --git a/metrics/prometheus/collector_test.go b/metrics/prometheus/collector_test.go index 43f2f804d32e..ff87c8e765e1 100644 --- a/metrics/prometheus/collector_test.go +++ b/metrics/prometheus/collector_test.go @@ -20,6 +20,10 @@ func TestCollector(t *testing.T) { counter.Inc(12345) c.addCounter("test/counter", counter) + counterfloat64 := metrics.NewCounterFloat64() + counterfloat64.Inc(54321.98) + c.addCounterFloat64("test/counter_float64", counterfloat64) + gauge := metrics.NewGauge() gauge.Update(23456) c.addGauge("test/gauge", gauge) @@ -61,6 +65,9 @@ func TestCollector(t *testing.T) { const expectedOutput = `# TYPE test_counter gauge test_counter 12345 +# TYPE test_counter_float64 gauge +test_counter_float64 54321.98 + # TYPE test_gauge gauge test_gauge 23456 diff --git a/metrics/prometheus/prometheus.go b/metrics/prometheus/prometheus.go index c8408d8cab85..d966fa9a8666 100644 --- a/metrics/prometheus/prometheus.go +++ b/metrics/prometheus/prometheus.go @@ -45,6 +45,8 @@ func Handler(reg metrics.Registry) http.Handler { switch m := i.(type) { case metrics.Counter: c.addCounter(name, m.Snapshot()) + case metrics.CounterFloat64: + c.addCounterFloat64(name, m.Snapshot()) case metrics.Gauge: c.addGauge(name, m.Snapshot()) case metrics.GaugeFloat64: diff --git a/metrics/registry.go b/metrics/registry.go index c5435adf2402..4c62248351a7 100644 --- a/metrics/registry.go +++ b/metrics/registry.go @@ -120,6 +120,8 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} { switch metric := i.(type) { case Counter: values["count"] = metric.Count() + case CounterFloat64: + values["count"] = metric.Count() case Gauge: values["value"] = metric.Value() case GaugeFloat64: @@ -196,7 +198,7 @@ func (r *StandardRegistry) register(name string, i interface{}) error { return DuplicateMetric(name) } switch i.(type) { - case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer: + case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer: r.metrics[name] = i } return nil diff --git a/metrics/syslog.go b/metrics/syslog.go index 551a2bd0f072..f23b07e199f3 100644 --- a/metrics/syslog.go +++ b/metrics/syslog.go @@ -17,6 +17,8 @@ func Syslog(r Registry, d time.Duration, w *syslog.Writer) { switch metric := i.(type) { case Counter: w.Info(fmt.Sprintf("counter %s: count: %d", name, metric.Count())) + case CounterFloat64: + w.Info(fmt.Sprintf("counter %s: count: %f", name, metric.Count())) case Gauge: w.Info(fmt.Sprintf("gauge %s: value: %d", name, metric.Value())) case GaugeFloat64: diff --git a/metrics/writer.go b/metrics/writer.go index 88521a80d9d7..256fbd14c9b9 100644 --- a/metrics/writer.go +++ b/metrics/writer.go @@ -29,6 +29,9 @@ func WriteOnce(r Registry, w io.Writer) { case Counter: fmt.Fprintf(w, "counter %s\n", namedMetric.name) fmt.Fprintf(w, " count: %9d\n", metric.Count()) + case CounterFloat64: + fmt.Fprintf(w, "counter %s\n", namedMetric.name) + fmt.Fprintf(w, " count: %f\n", metric.Count()) case Gauge: fmt.Fprintf(w, "gauge %s\n", namedMetric.name) fmt.Fprintf(w, " value: %9d\n", metric.Value())