From 50ed5569cd9872c5e1952172a7d698a44ab773c0 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Sun, 15 Dec 2019 01:49:05 +0000 Subject: [PATCH] Created transactional GaugeVec for easy atomic Gauge change. Signed-off-by: Bartek Plotka --- pkg/extprom/tx_gauge.go | 89 +++++++++++++++++ pkg/extprom/tx_gauge_test.go | 179 +++++++++++++++++++++++++++++++++++ 2 files changed, 268 insertions(+) create mode 100644 pkg/extprom/tx_gauge.go create mode 100644 pkg/extprom/tx_gauge_test.go diff --git a/pkg/extprom/tx_gauge.go b/pkg/extprom/tx_gauge.go new file mode 100644 index 0000000000..d0ee7342e6 --- /dev/null +++ b/pkg/extprom/tx_gauge.go @@ -0,0 +1,89 @@ +package extprom + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type TxGaugeVec struct { + current *prometheus.GaugeVec + mtx sync.Mutex + newMetricVal func() *prometheus.GaugeVec + + tx *prometheus.GaugeVec +} + +// NewTxGaugeVec is a prometheus.GaugeVec that allows to start atomic metric value transaction. +// It might be useful if long process that wants to update a GaugeVec but wants to build/accumulate those metrics +// in a concurrent way without exposing partial state to Prometheus. +// Caller can also use this as normal GaugeVec. +// +// Additionally it allows to init LabelValues on each transaction. +// NOTE: This is quite naive implementation creating new prometheus.GaugeVec on each `ResetTx`, use wisely. +func NewTxGaugeVec(opts prometheus.GaugeOpts, labelNames []string, initLabelValues ...[]string) *TxGaugeVec { + f := func() *prometheus.GaugeVec { + g := prometheus.NewGaugeVec(opts, labelNames) + for _, vals := range initLabelValues { + g.WithLabelValues(vals...) + } + return g + } + return &TxGaugeVec{ + current: f(), + newMetricVal: f, + } +} + +// ResetTx starts new transaction. Not goroutine-safe. +func (tx *TxGaugeVec) ResetTx() { + tx.tx = tx.newMetricVal() +} + +// Submit atomically and fully applies new values from existing transaction GaugeVec. Not goroutine-safe. +func (tx *TxGaugeVec) Submit() { + if tx.tx == nil { + return + } + + tx.mtx.Lock() + tx.current = tx.tx + tx.mtx.Unlock() +} + +// Describe is used in Register. +func (tx *TxGaugeVec) Describe(ch chan<- *prometheus.Desc) { + tx.mtx.Lock() + defer tx.mtx.Unlock() + + tx.current.Describe(ch) +} + +// Collect is used by Registered. +func (tx *TxGaugeVec) Collect(ch chan<- prometheus.Metric) { + tx.mtx.Lock() + defer tx.mtx.Unlock() + + tx.current.Collect(ch) +} + +// With works as GetMetricWith, but panics where GetMetricWithLabels would have +// returned an error. Not returning an error allows shortcuts like +// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42) +func (tx *TxGaugeVec) With(labels prometheus.Labels) prometheus.Gauge { + if tx.tx == nil { + tx.ResetTx() + } + return tx.tx.With(labels) +} + +// WithLabelValues works as GetMetricWithLabelValues, but panics where +// GetMetricWithLabelValues would have returned an error. Not returning an +// error allows shortcuts like +// myVec.WithLabelValues("404", "GET").Add(42) +func (tx *TxGaugeVec) WithLabelValues(lvs ...string) prometheus.Gauge { + if tx.tx == nil { + tx.ResetTx() + } + return tx.tx.WithLabelValues(lvs...) +} diff --git a/pkg/extprom/tx_gauge_test.go b/pkg/extprom/tx_gauge_test.go new file mode 100644 index 0000000000..a2c3a5988c --- /dev/null +++ b/pkg/extprom/tx_gauge_test.go @@ -0,0 +1,179 @@ +package extprom + +import ( + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestTxGaugeVec(t *testing.T) { + g := NewTxGaugeVec(prometheus.GaugeOpts{ + Name: "metric", + }, []string{"a", "b"}, []string{"a1", "b1"}, []string{"a2", "b2"}) + + for _, tcase := range []struct { + name string + txUse func() + exp map[string]float64 + }{ + { + name: "nothing", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=a1,b=b1", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a1", "b1").Add(0.3) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1.3, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=a1,b=b1 again, should return same result", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a1", "b1").Add(-10) + g.WithLabelValues("a1", "b1").Add(10.3) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1.3000000000000007, // Say hi to float comparisons. + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=a1,b=b1 again, should return same result", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a1", "b1").Add(-10) + g.WithLabelValues("a1", "b1").Set(1.3) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1.3, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "nothing again", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=aX,b=b1", + txUse: func() { + g.WithLabelValues("aX", "b1").Set(500.2) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + "name:\"a\" value:\"aX\" ,name:\"b\" value:\"b1\" ": 500.2, + }, + }, + { + name: "change a=aX,b=b1", + txUse: func() { + g.WithLabelValues("aX", "b1").Set(500.2) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + "name:\"a\" value:\"aX\" ,name:\"b\" value:\"b1\" ": 500.2, + }, + }, + { + name: "nothing again", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change 3 metrics", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a2", "b2").Add(-2) + g.WithLabelValues("a3", "b3").Set(1.1) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": -2, + "name:\"a\" value:\"a3\" ,name:\"b\" value:\"b3\" ": 1.1, + }, + }, + { + name: "nothing again", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + } { + if ok := t.Run(tcase.name, func(t *testing.T) { + g.ResetTx() + + tcase.txUse() + g.Submit() + + testutil.Equals(t, tcase.exp, toFloat64(t, g)) + + }); !ok { + return + } + } +} + +// toFloat64 is prometheus/client_golang/prometheus/testutil.ToFloat64 version that works with multiple labelnames. +// NOTE: Be careful on float comparison. +func toFloat64(t *testing.T, c prometheus.Collector) map[string]float64 { + var ( + mChan = make(chan prometheus.Metric) + exp = map[string]float64{} + ) + + go func() { + c.Collect(mChan) + close(mChan) + }() + + for m := range mChan { + pb := &dto.Metric{} + testutil.Ok(t, m.Write(pb)) + if pb.Gauge != nil { + exp[lbToString(pb.GetLabel())] = pb.Gauge.GetValue() + continue + } + if pb.Counter != nil { + exp[lbToString(pb.GetLabel())] = pb.Counter.GetValue() + continue + } + if pb.Untyped != nil { + exp[lbToString(pb.GetLabel())] = pb.Untyped.GetValue() + } + panic(fmt.Errorf("collected a non-gauge/counter/untyped metric: %s", pb)) + } + + return exp +} + +func lbToString(pairs []*dto.LabelPair) string { + var ret []string + for _, r := range pairs { + ret = append(ret, r.String()) + } + return strings.Join(ret, ",") +}