Skip to content

Commit

Permalink
Merge pull request #347 from grafana/custom-registry
Browse files Browse the repository at this point in the history
Pass around custom registry for registering exporter metrics
  • Loading branch information
Matthias Rampke authored Nov 24, 2020
2 parents dcd95d0 + b5deeda commit 8772c03
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 65 deletions.
2 changes: 1 addition & 1 deletion bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ mappings:
events := make(chan event.Events)
defer close(events)
go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down
3 changes: 2 additions & 1 deletion exporter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/go-kit/kit/log"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
Expand Down Expand Up @@ -171,7 +172,7 @@ mappings:
b.Fatalf("Config load error: %s %s", config, err)
}

ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)

// reset benchmark timer to not measure startup costs
b.ResetTimer()
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func main() {

}

mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}
if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
if err != nil {
Expand All @@ -458,7 +458,7 @@ func main() {
mapper.InitCache(*cacheSize, cacheOption)
}

exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)

if *checkConfig {
level.Info(logger).Log("msg", "Configuration check successful, exiting")
Expand Down
4 changes: 2 additions & 2 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
}
}

func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
func NewExporter(reg prometheus.Registerer, mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
return &Exporter{
Mapper: mapper,
Registry: registry.NewRegistry(mapper),
Registry: registry.NewRegistry(reg, mapper),
Logger: logger,
EventsActions: eventsActions,
EventsUnmapped: eventsUnmapped,
Expand Down
24 changes: 12 additions & 12 deletions pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestNegativeCounter(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

updated := getTelemetryCounterValue(errorCounter)
Expand Down Expand Up @@ -265,7 +265,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}

ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

metrics, err := prometheus.DefaultGatherer.Gather()
Expand Down Expand Up @@ -323,7 +323,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}

ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

metrics, err := prometheus.DefaultGatherer.Gather()
Expand Down Expand Up @@ -538,7 +538,7 @@ mappings:
events <- s.in
close(events)
}()
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

metrics, err := prometheus.DefaultGatherer.Gather()
Expand Down Expand Up @@ -593,7 +593,7 @@ mappings:
errorCounter := errorEventStats.WithLabelValues("empty_metric_name")
prev := getTelemetryCounterValue(errorCounter)

ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

updated := getTelemetryCounterValue(errorCounter)
Expand Down Expand Up @@ -660,7 +660,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}

Expand All @@ -674,7 +674,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down Expand Up @@ -718,7 +718,7 @@ func TestHistogramUnits(t *testing.T) {
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram
ex.Listen(events)
}()
Expand Down Expand Up @@ -755,7 +755,7 @@ func TestCounterIncrement(t *testing.T) {
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down Expand Up @@ -864,7 +864,7 @@ mappings:
events := make(chan event.Events)
defer close(events)
go func() {
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down Expand Up @@ -952,7 +952,7 @@ mappings:
}

func TestHashLabelNames(t *testing.T) {
r := registry.NewRegistry(nil)
r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
// Validate value hash changes and name has doesn't when just the value changes.
hash1, _ := r.HashLabels(map[string]string{
"label": "value1",
Expand Down Expand Up @@ -1113,7 +1113,7 @@ func BenchmarkHashNameAndLabels(b *testing.B) {
},
}

r := registry.NewRegistry(nil)
r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
for _, s := range scenarios {
b.Run(s.name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
Expand Down
21 changes: 11 additions & 10 deletions pkg/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ var (
)

type MetricMapper struct {
Defaults mapperConfigDefaults `yaml:"defaults"`
Mappings []MetricMapping `yaml:"mappings"`
FSM *fsm.FSM
doFSM bool
doRegex bool
cache MetricMapperCache
mutex sync.RWMutex
Registerer prometheus.Registerer
Defaults mapperConfigDefaults `yaml:"defaults"`
Mappings []MetricMapping `yaml:"mappings"`
FSM *fsm.FSM
doFSM bool
doRegex bool
cache MetricMapperCache
mutex sync.RWMutex

MappingsCount prometheus.Gauge
}
Expand Down Expand Up @@ -252,7 +253,7 @@ func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...C

func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache()
m.cache = NewMetricMapperNoopCache(m.Registerer)
} else {
o := cacheOptions{
cacheType: "lru",
Expand All @@ -267,9 +268,9 @@ func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
)
switch o.cacheType {
case "lru":
cache, err = NewMetricMapperCache(cacheSize)
cache, err = NewMetricMapperCache(m.Registerer, cacheSize)
case "random":
cache, err = NewMetricMapperRRCache(cacheSize)
cache, err = NewMetricMapperRRCache(m.Registerer, cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", o.cacheType)
}
Expand Down
70 changes: 41 additions & 29 deletions pkg/mapper/mapper_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,41 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
cacheLength = prometheus.NewGauge(
type CacheMetrics struct {
CacheLength prometheus.Gauge
CacheGetsTotal prometheus.Counter
CacheHitsTotal prometheus.Counter
}

func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics {
var m CacheMetrics

m.CacheLength = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "statsd_metric_mapper_cache_length",
Help: "The count of unique metrics currently cached.",
},
)
cacheGetsTotal = prometheus.NewCounter(
m.CacheGetsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_gets_total",
Help: "The count of total metric cache gets.",
},
)
cacheHitsTotal = prometheus.NewCounter(
m.CacheHitsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_hits_total",
Help: "The count of total metric cache hits.",
},
)
)

if reg != nil {
reg.MustRegister(m.CacheLength)
reg.MustRegister(m.CacheGetsTotal)
reg.MustRegister(m.CacheHitsTotal)
}
return &m
}

type cacheOptions struct {
cacheType string
Expand Down Expand Up @@ -67,26 +82,28 @@ type MetricMapperCache interface {

type MetricMapperLRUCache struct {
MetricMapperCache
cache *lru.Cache
cache *lru.Cache
metrics *CacheMetrics
}

type MetricMapperNoopCache struct {
MetricMapperCache
metrics *CacheMetrics
}

func NewMetricMapperCache(size int) (*MetricMapperLRUCache, error) {
cacheLength.Set(0)
func NewMetricMapperCache(reg prometheus.Registerer, size int) (*MetricMapperLRUCache, error) {
metrics := NewCacheMetrics(reg)
cache, err := lru.New(size)
if err != nil {
return &MetricMapperLRUCache{}, err
}
return &MetricMapperLRUCache{cache: cache}, nil
return &MetricMapperLRUCache{metrics: metrics, cache: cache}, nil
}

func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
cacheGetsTotal.Inc()
m.metrics.CacheGetsTotal.Inc()
if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok {
cacheHitsTotal.Inc()
m.metrics.CacheHitsTotal.Inc()
return result.(*MetricMapperCacheResult), true
} else {
return nil, false
Expand All @@ -104,16 +121,15 @@ func (m *MetricMapperLRUCache) AddMiss(metricString string, metricType MetricTyp
}

func (m *MetricMapperLRUCache) trackCacheLength() {
cacheLength.Set(float64(m.cache.Len()))
m.metrics.CacheLength.Set(float64(m.cache.Len()))
}

func formatKey(metricString string, metricType MetricType) string {
return string(metricType) + "." + metricString
}

func NewMetricMapperNoopCache() *MetricMapperNoopCache {
cacheLength.Set(0)
return &MetricMapperNoopCache{}
func NewMetricMapperNoopCache(reg prometheus.Registerer) *MetricMapperNoopCache {
return &MetricMapperNoopCache{metrics: NewCacheMetrics(reg)}
}

func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
Expand All @@ -130,16 +146,18 @@ func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricTy

type MetricMapperRRCache struct {
MetricMapperCache
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
metrics *CacheMetrics
}

func NewMetricMapperRRCache(size int) (*MetricMapperRRCache, error) {
cacheLength.Set(0)
func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*MetricMapperRRCache, error) {
metrics := NewCacheMetrics(reg)
c := &MetricMapperRRCache{
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
metrics: metrics,
}
return c, nil
}
Expand Down Expand Up @@ -188,11 +206,5 @@ func (m *MetricMapperRRCache) trackCacheLength() {
m.lock.RLock()
length := len(m.items)
m.lock.RUnlock()
cacheLength.Set(float64(length))
}

func init() {
prometheus.MustRegister(cacheLength)
prometheus.MustRegister(cacheGetsTotal)
prometheus.MustRegister(cacheHitsTotal)
m.metrics.CacheLength.Set(float64(length))
}
18 changes: 10 additions & 8 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
}

type Registry struct {
Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper
Registerer prometheus.Registerer
Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper
// The below value and label variables are allocated in the registry struct
// so that we don't have to allocate them every time have to compute a label
// hash.
ValueBuf, NameBuf bytes.Buffer
Hasher hash.Hash64
}

func NewRegistry(mapper *mapper.MetricMapper) *Registry {
func NewRegistry(reg prometheus.Registerer, mapper *mapper.MetricMapper) *Registry {
return &Registry{
Metrics: make(map[string]metrics.Metric),
Mapper: mapper,
Hasher: fnv.New64a(),
Registerer: reg,
Metrics: make(map[string]metrics.Metric),
Mapper: mapper,
Hasher: fnv.New64a(),
}
}

Expand Down Expand Up @@ -170,7 +172,7 @@ func (r *Registry) GetCounter(metricName string, labels prometheus.Labels, help
Help: help,
}, labelNames)

if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
if err := r.Registerer.Register(uncheckedCollector{counterVec}); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -206,7 +208,7 @@ func (r *Registry) GetGauge(metricName string, labels prometheus.Labels, help st
Help: help,
}, labelNames)

if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
if err := r.Registerer.Register(uncheckedCollector{gaugeVec}); err != nil {
return nil, err
}
} else {
Expand Down

0 comments on commit 8772c03

Please sign in to comment.