Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass around custom registry for registering exporter metrics #347

Merged
merged 1 commit into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ mappings:
events := make(chan event.Events)
defer close(events)
go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down
3 changes: 2 additions & 1 deletion exporter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/go-kit/kit/log"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
Expand Down Expand Up @@ -171,7 +172,7 @@ mappings:
b.Fatalf("Config load error: %s %s", config, err)
}

ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := exporter.NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)

// reset benchmark timer to not measure startup costs
b.ResetTimer()
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func main() {

}

mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
mapper := &mapper.MetricMapper{Registerer: prometheus.DefaultRegisterer, MappingsCount: mappingsCount}
if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
if err != nil {
Expand All @@ -458,7 +458,7 @@ func main() {
mapper.InitCache(*cacheSize, cacheOption)
}

exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)

if *checkConfig {
level.Info(logger).Log("msg", "Configuration check successful, exiting")
Expand Down
4 changes: 2 additions & 2 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ func (b *Exporter) handleEvent(thisEvent event.Event) {
}
}

func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
func NewExporter(reg prometheus.Registerer, mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
return &Exporter{
Mapper: mapper,
Registry: registry.NewRegistry(mapper),
Registry: registry.NewRegistry(reg, mapper),
Logger: logger,
EventsActions: eventsActions,
EventsUnmapped: eventsUnmapped,
Expand Down
24 changes: 12 additions & 12 deletions pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestNegativeCounter(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

updated := getTelemetryCounterValue(errorCounter)
Expand Down Expand Up @@ -265,7 +265,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}

ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

metrics, err := prometheus.DefaultGatherer.Gather()
Expand Down Expand Up @@ -323,7 +323,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}

ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

metrics, err := prometheus.DefaultGatherer.Gather()
Expand Down Expand Up @@ -538,7 +538,7 @@ mappings:
events <- s.in
close(events)
}()
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

metrics, err := prometheus.DefaultGatherer.Gather()
Expand Down Expand Up @@ -593,7 +593,7 @@ mappings:
errorCounter := errorEventStats.WithLabelValues("empty_metric_name")
prev := getTelemetryCounterValue(errorCounter)

ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)

updated := getTelemetryCounterValue(errorCounter)
Expand Down Expand Up @@ -660,7 +660,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}

Expand All @@ -674,7 +674,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down Expand Up @@ -718,7 +718,7 @@ func TestHistogramUnits(t *testing.T) {
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.ObserverType = mapper.ObserverTypeHistogram
ex.Listen(events)
}()
Expand Down Expand Up @@ -755,7 +755,7 @@ func TestCounterIncrement(t *testing.T) {
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down Expand Up @@ -864,7 +864,7 @@ mappings:
events := make(chan event.Events)
defer close(events)
go func() {
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex := NewExporter(prometheus.DefaultRegisterer, testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()

Expand Down Expand Up @@ -952,7 +952,7 @@ mappings:
}

func TestHashLabelNames(t *testing.T) {
r := registry.NewRegistry(nil)
r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
// Validate value hash changes and name has doesn't when just the value changes.
hash1, _ := r.HashLabels(map[string]string{
"label": "value1",
Expand Down Expand Up @@ -1113,7 +1113,7 @@ func BenchmarkHashNameAndLabels(b *testing.B) {
},
}

r := registry.NewRegistry(nil)
r := registry.NewRegistry(prometheus.DefaultRegisterer, nil)
for _, s := range scenarios {
b.Run(s.name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
Expand Down
21 changes: 11 additions & 10 deletions pkg/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ var (
)

type MetricMapper struct {
Defaults mapperConfigDefaults `yaml:"defaults"`
Mappings []MetricMapping `yaml:"mappings"`
FSM *fsm.FSM
doFSM bool
doRegex bool
cache MetricMapperCache
mutex sync.RWMutex
Registerer prometheus.Registerer
Defaults mapperConfigDefaults `yaml:"defaults"`
Mappings []MetricMapping `yaml:"mappings"`
FSM *fsm.FSM
doFSM bool
doRegex bool
cache MetricMapperCache
mutex sync.RWMutex

MappingsCount prometheus.Gauge
}
Expand Down Expand Up @@ -252,7 +253,7 @@ func (m *MetricMapper) InitFromFile(fileName string, cacheSize int, options ...C

func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache()
m.cache = NewMetricMapperNoopCache(m.Registerer)
} else {
o := cacheOptions{
cacheType: "lru",
Expand All @@ -267,9 +268,9 @@ func (m *MetricMapper) InitCache(cacheSize int, options ...CacheOption) {
)
switch o.cacheType {
case "lru":
cache, err = NewMetricMapperCache(cacheSize)
cache, err = NewMetricMapperCache(m.Registerer, cacheSize)
case "random":
cache, err = NewMetricMapperRRCache(cacheSize)
cache, err = NewMetricMapperRRCache(m.Registerer, cacheSize)
default:
err = fmt.Errorf("unsupported cache type %q", o.cacheType)
}
Expand Down
70 changes: 41 additions & 29 deletions pkg/mapper/mapper_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,41 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
cacheLength = prometheus.NewGauge(
type CacheMetrics struct {
CacheLength prometheus.Gauge
CacheGetsTotal prometheus.Counter
CacheHitsTotal prometheus.Counter
}

func NewCacheMetrics(reg prometheus.Registerer) *CacheMetrics {
var m CacheMetrics

m.CacheLength = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "statsd_metric_mapper_cache_length",
Help: "The count of unique metrics currently cached.",
},
)
cacheGetsTotal = prometheus.NewCounter(
m.CacheGetsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_gets_total",
Help: "The count of total metric cache gets.",
},
)
cacheHitsTotal = prometheus.NewCounter(
m.CacheHitsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_metric_mapper_cache_hits_total",
Help: "The count of total metric cache hits.",
},
)
)

if reg != nil {
reg.MustRegister(m.CacheLength)
reg.MustRegister(m.CacheGetsTotal)
reg.MustRegister(m.CacheHitsTotal)
}
return &m
}

type cacheOptions struct {
cacheType string
Expand Down Expand Up @@ -67,26 +82,28 @@ type MetricMapperCache interface {

type MetricMapperLRUCache struct {
MetricMapperCache
cache *lru.Cache
cache *lru.Cache
metrics *CacheMetrics
}

type MetricMapperNoopCache struct {
MetricMapperCache
metrics *CacheMetrics
}

func NewMetricMapperCache(size int) (*MetricMapperLRUCache, error) {
cacheLength.Set(0)
func NewMetricMapperCache(reg prometheus.Registerer, size int) (*MetricMapperLRUCache, error) {
metrics := NewCacheMetrics(reg)
cache, err := lru.New(size)
if err != nil {
return &MetricMapperLRUCache{}, err
}
return &MetricMapperLRUCache{cache: cache}, nil
return &MetricMapperLRUCache{metrics: metrics, cache: cache}, nil
}

func (m *MetricMapperLRUCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
cacheGetsTotal.Inc()
m.metrics.CacheGetsTotal.Inc()
if result, ok := m.cache.Get(formatKey(metricString, metricType)); ok {
cacheHitsTotal.Inc()
m.metrics.CacheHitsTotal.Inc()
return result.(*MetricMapperCacheResult), true
} else {
return nil, false
Expand All @@ -104,16 +121,15 @@ func (m *MetricMapperLRUCache) AddMiss(metricString string, metricType MetricTyp
}

func (m *MetricMapperLRUCache) trackCacheLength() {
cacheLength.Set(float64(m.cache.Len()))
m.metrics.CacheLength.Set(float64(m.cache.Len()))
}

func formatKey(metricString string, metricType MetricType) string {
return string(metricType) + "." + metricString
}

func NewMetricMapperNoopCache() *MetricMapperNoopCache {
cacheLength.Set(0)
return &MetricMapperNoopCache{}
func NewMetricMapperNoopCache(reg prometheus.Registerer) *MetricMapperNoopCache {
return &MetricMapperNoopCache{metrics: NewCacheMetrics(reg)}
}

func (m *MetricMapperNoopCache) Get(metricString string, metricType MetricType) (*MetricMapperCacheResult, bool) {
Expand All @@ -130,16 +146,18 @@ func (m *MetricMapperNoopCache) AddMiss(metricString string, metricType MetricTy

type MetricMapperRRCache struct {
MetricMapperCache
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
lock sync.RWMutex
size int
items map[string]*MetricMapperCacheResult
metrics *CacheMetrics
}

func NewMetricMapperRRCache(size int) (*MetricMapperRRCache, error) {
cacheLength.Set(0)
func NewMetricMapperRRCache(reg prometheus.Registerer, size int) (*MetricMapperRRCache, error) {
metrics := NewCacheMetrics(reg)
c := &MetricMapperRRCache{
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
items: make(map[string]*MetricMapperCacheResult, size+1),
size: size,
metrics: metrics,
}
return c, nil
}
Expand Down Expand Up @@ -188,11 +206,5 @@ func (m *MetricMapperRRCache) trackCacheLength() {
m.lock.RLock()
length := len(m.items)
m.lock.RUnlock()
cacheLength.Set(float64(length))
}

func init() {
prometheus.MustRegister(cacheLength)
prometheus.MustRegister(cacheGetsTotal)
prometheus.MustRegister(cacheHitsTotal)
m.metrics.CacheLength.Set(float64(length))
}
18 changes: 10 additions & 8 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
}

type Registry struct {
Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper
Registerer prometheus.Registerer
Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper
// The below value and label variables are allocated in the registry struct
// so that we don't have to allocate them every time have to compute a label
// hash.
ValueBuf, NameBuf bytes.Buffer
Hasher hash.Hash64
}

func NewRegistry(mapper *mapper.MetricMapper) *Registry {
func NewRegistry(reg prometheus.Registerer, mapper *mapper.MetricMapper) *Registry {
return &Registry{
Metrics: make(map[string]metrics.Metric),
Mapper: mapper,
Hasher: fnv.New64a(),
Registerer: reg,
Metrics: make(map[string]metrics.Metric),
Mapper: mapper,
Hasher: fnv.New64a(),
}
}

Expand Down Expand Up @@ -170,7 +172,7 @@ func (r *Registry) GetCounter(metricName string, labels prometheus.Labels, help
Help: help,
}, labelNames)

if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
if err := r.Registerer.Register(uncheckedCollector{counterVec}); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -206,7 +208,7 @@ func (r *Registry) GetGauge(metricName string, labels prometheus.Labels, help st
Help: help,
}, labelNames)

if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
if err := r.Registerer.Register(uncheckedCollector{gaugeVec}); err != nil {
return nil, err
}
} else {
Expand Down