Skip to content

Commit

Permalink
mapmetrics: Define size and errors metrics as custom collectors
Browse files Browse the repository at this point in the history
This commit reimplements two metrics: tetragon_map_in_use_gauge and
tetragon_map_errors_total, so that they are exposed by custom Prometheus
collectors. This means that metric values are read from BPF maps at the scrape
time, without a need for a userland tickers. From the user perspective, the
metrics are more accurate.

Additionally, the errors metric is exposed as a counter instead of a gauge.

Signed-off-by: Anna Kapuscinska <anna@isovalent.com>
  • Loading branch information
lambdanis committed Oct 3, 2023
1 parent c95db9e commit 9405281
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 103 deletions.
6 changes: 4 additions & 2 deletions pkg/eventcache/eventcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/cilium/tetragon/pkg/ktime"
"github.com/cilium/tetragon/pkg/metrics/errormetrics"
"github.com/cilium/tetragon/pkg/metrics/eventcachemetrics"
"github.com/cilium/tetragon/pkg/metrics/mapmetrics"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/node"
Expand Down Expand Up @@ -167,7 +166,6 @@ func (ec *Cache) loop() {
* event anyways.
*/
ec.handleEvents()
mapmetrics.MapSizeSet("eventcache", 0, float64(len(ec.cache)))

case event := <-ec.objsChan:
eventcachemetrics.EventCacheCount.Inc()
Expand Down Expand Up @@ -241,3 +239,7 @@ func New(s *server.Server) *Cache {
func Get() *Cache {
return cache
}

func (ec *Cache) len() int {
return len(ec.cache)
}
30 changes: 30 additions & 0 deletions pkg/eventcache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Tetragon

package eventcache

import (
"github.com/cilium/tetragon/pkg/metrics/mapmetrics"
"github.com/prometheus/client_golang/prometheus"
)

// bpfCollector implements prometheus.Collector. It collects metrics directly from BPF maps.
type bpfCollector struct{}

func NewBPFCollector() prometheus.Collector {
return &bpfCollector{}
}

func (c *bpfCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- mapmetrics.MapSize.Desc()
}

func (c *bpfCollector) Collect(ch chan<- prometheus.Metric) {
ec := Get()
if ec != nil {
ch <- mapmetrics.MapSize.MustMetric(
float64(ec.len()),
"eventcache", "0",
)
}
}
10 changes: 10 additions & 0 deletions pkg/metrics/config/initmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package config

import (
"github.com/cilium/tetragon/pkg/eventcache"
"github.com/cilium/tetragon/pkg/grpc/tracing"
"github.com/cilium/tetragon/pkg/metrics/errormetrics"
"github.com/cilium/tetragon/pkg/metrics/eventcachemetrics"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/cilium/tetragon/pkg/metrics/syscallmetrics"
"github.com/cilium/tetragon/pkg/metrics/watchermetrics"
"github.com/cilium/tetragon/pkg/observer"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/version"
grpcmetrics "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -44,6 +46,14 @@ func InitAllMetrics(registry *prometheus.Registry) {
tracing.InitMetrics(registry)
ratelimitmetrics.InitMetrics(registry)

// register BPF collectors
registry.MustRegister(mapmetrics.NewBPFCollector(
eventcache.NewBPFCollector(),
observer.NewBPFCollector(),
process.NewBPFCollector(),
))

// register common third-party collectors
registry.MustRegister(collectors.NewGoCollector())
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(grpcmetrics.NewServerMetrics())
Expand Down
68 changes: 33 additions & 35 deletions pkg/metrics/mapmetrics/mapmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,62 @@
package mapmetrics

import (
"fmt"

"github.com/cilium/tetragon/pkg/metrics"
"github.com/cilium/tetragon/pkg/metrics/consts"
"github.com/prometheus/client_golang/prometheus"
)

var (
MapSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: consts.MetricsNamespace,
Name: "map_in_use_gauge",
Help: "The total number of in-use entries per map.",
ConstLabels: nil,
}, []string{"map", "total"})

MapDrops = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: consts.MetricsNamespace,
Name: "map_drops_total",
Help: "The total number of entries dropped per LRU map.",
ConstLabels: nil,
}, []string{"map"})

MapErrors = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: consts.MetricsNamespace,
Name: "map_errors_total",
Help: "The total number of entries dropped per LRU map.",
ConstLabels: nil,
}, []string{"map"})
MapSize = metrics.NewBPFGauge(prometheus.NewDesc(
prometheus.BuildFQName(consts.MetricsNamespace, "", "map_in_use_gauge"),
"The total number of in-use entries per map.",
[]string{"map", "total"}, nil,
))
MapErrors = metrics.NewBPFCounter(prometheus.NewDesc(
prometheus.BuildFQName(consts.MetricsNamespace, "", "map_errors_total"),
"The total number of entries dropped per LRU map.",
[]string{"map"}, nil,
))
)

func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(MapSize)
registry.MustRegister(MapDrops)
registry.MustRegister(MapErrors)
}

// Get a new handle on a mapSize metric for a mapName and totalCapacity
func GetMapSize(mapName string, totalCapacity int) prometheus.Gauge {
return MapSize.WithLabelValues(mapName, fmt.Sprint(totalCapacity))
// custom collectors are registered independently
}

func GetMapErrors(mapName string) prometheus.Gauge {
return MapErrors.WithLabelValues(mapName)
func MapDropInc(mapName string) {
MapDrops.WithLabelValues(mapName).Inc()
}

// Increment a mapSize metric for a mapName and totalCapacity
func MapSizeInc(mapName string, totalCapacity int) {
GetMapSize(mapName, totalCapacity).Inc()
// bpfCollector implements prometheus.Collector. It collects metrics directly from BPF maps.
// NB: We can't register individual BPF collectors collecting map metrics, because they share the
// metrics descriptors. Sending duplicate descriptors from different collectors results in
// a panic. Sending duplicate descriptors from the same collector is fine, so we define a simple
// wrapper for all collectors collecting map metrics.
type bpfCollector struct {
collectors []prometheus.Collector
}

// Set a mapSize metric to size for a mapName and totalCapacity
func MapSizeSet(mapName string, totalCapacity int, size float64) {
GetMapSize(mapName, totalCapacity).Set(size)
func NewBPFCollector(collectors ...prometheus.Collector) prometheus.Collector {
return &bpfCollector{
collectors: collectors,
}
}

func MapErrorSet(mapName string, errTotal float64) {
GetMapErrors(mapName).Set(errTotal)
func (c *bpfCollector) Describe(ch chan<- *prometheus.Desc) {
for _, m := range c.collectors {
m.Describe(ch)
}
}

func MapDropInc(mapName string) {
MapDrops.WithLabelValues(mapName).Inc()
func (c *bpfCollector) Collect(ch chan<- prometheus.Metric) {
for _, m := range c.collectors {
m.Collect(ch)
}
}
2 changes: 0 additions & 2 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ func (k *Observer) UpdateRuntimeConf(mapDir string) error {

// Start starts the observer
func (k *Observer) Start(ctx context.Context) error {
k.startUpdateMapMetrics()

k.PerfConfig = bpf.DefaultPerfEventConfig()

var err error
Expand Down
90 changes: 46 additions & 44 deletions pkg/observer/observer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,51 @@
package observer

import (
"fmt"
"path/filepath"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/tetragon/pkg/metrics/mapmetrics"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/sensors"
"github.com/prometheus/client_golang/prometheus"
)

func updateMapSize(mapLinkStats *ebpf.Map, maxEntries int, name string) {
// bpfCollector implements prometheus.Collector. It collects metrics directly from BPF maps.
type bpfCollector struct{}

func NewBPFCollector() prometheus.Collector {
return &bpfCollector{}
}

func (c *bpfCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- mapmetrics.MapSize.Desc()
ch <- mapmetrics.MapErrors.Desc()
}

func (c *bpfCollector) Collect(ch chan<- prometheus.Metric) {
for _, m := range sensors.AllMaps {
name := m.Name
pin := filepath.Join(option.Config.MapDir, name)
pinStats := pin + "_stats"

mapLinkStats, err := ebpf.LoadPinnedMap(pinStats, nil)
if err != nil {
return
}
defer mapLinkStats.Close()
mapLink, err := ebpf.LoadPinnedMap(pin, nil)
if err != nil {
return
}
defer mapLink.Close()

updateMapSize(ch, mapLinkStats, int(mapLink.MaxEntries()), name)
updateMapErrors(ch, mapLinkStats, name)
}
}

func updateMapSize(ch chan<- prometheus.Metric, mapLinkStats *ebpf.Map, maxEntries int, name string) {
var values []int64
if err := mapLinkStats.Lookup(int32(0), &values); err != nil {
return
Expand All @@ -23,11 +58,13 @@ func updateMapSize(mapLinkStats *ebpf.Map, maxEntries int, name string) {
for _, n := range values {
sum += n
}

mapmetrics.MapSizeSet(name, maxEntries, float64(sum))
ch <- mapmetrics.MapSize.MustMetric(
float64(sum),
name, fmt.Sprint(maxEntries),
)
}

func updateMapErrors(mapLinkStats *ebpf.Map, name string) {
func updateMapErrors(ch chan<- prometheus.Metric, mapLinkStats *ebpf.Map, name string) {
var values []int64
if err := mapLinkStats.Lookup(int32(1), &values); err != nil {
return
Expand All @@ -37,43 +74,8 @@ func updateMapErrors(mapLinkStats *ebpf.Map, name string) {
for _, n := range values {
sum += n
}

mapmetrics.MapErrorSet(name, float64(sum))
}

func updateMapMetric(name string) {
pin := filepath.Join(option.Config.MapDir, name)
pinStats := pin + "_stats"

mapLinkStats, err := ebpf.LoadPinnedMap(pinStats, nil)
if err != nil {
return
}
defer mapLinkStats.Close()
mapLink, err := ebpf.LoadPinnedMap(pin, nil)
if err != nil {
return
}
defer mapLink.Close()

updateMapSize(mapLinkStats, int(mapLink.MaxEntries()), name)
updateMapErrors(mapLinkStats, name)
}

func (k *Observer) startUpdateMapMetrics() {
update := func() {
for _, m := range sensors.AllMaps {
updateMapMetric(m.Name)
}
}

ticker := time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-ticker.C:
update()
}
}
}()
ch <- mapmetrics.MapErrors.MustMetric(
float64(sum),
name,
)
}
25 changes: 5 additions & 20 deletions pkg/process/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
)

type Cache struct {
cache *lru.Cache[string, *ProcessInternal]
deleteChan chan *ProcessInternal
stopChan chan bool
metricsStopChan chan bool
cache *lru.Cache[string, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
}

// garbage collection states
Expand Down Expand Up @@ -124,7 +124,6 @@ func (pc *Cache) refInc(p *ProcessInternal) {

func (pc *Cache) Purge() {
pc.stopChan <- true
pc.metricsStopChan <- true
}

func NewCache(
Expand All @@ -141,22 +140,8 @@ func NewCache(
}
pm := &Cache{
cache: lruCache,
size: processCacheSize,
}
update := func() {
mapmetrics.MapSizeSet("processLru", processCacheSize, float64(pm.cache.Len()))
}
ticker := time.NewTicker(60 * time.Second)
pm.metricsStopChan = make(chan bool)
go func() {
for {
select {
case <-ticker.C:
update()
case <-pm.metricsStopChan:
return
}
}
}()
pm.cacheGarbageCollector()
return pm, nil
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/process/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Tetragon

package process

import (
"fmt"

"github.com/cilium/tetragon/pkg/metrics/mapmetrics"
"github.com/prometheus/client_golang/prometheus"
)

// bpfCollector implements prometheus.Collector. It collects metrics directly from BPF maps.
type bpfCollector struct{}

func NewBPFCollector() prometheus.Collector {
return &bpfCollector{}
}

func (c *bpfCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- mapmetrics.MapSize.Desc()
}

func (c *bpfCollector) Collect(ch chan<- prometheus.Metric) {
if procCache != nil {
ch <- mapmetrics.MapSize.MustMetric(
float64(procCache.len()),
"processLru", fmt.Sprint(procCache.size),
)
}
}

0 comments on commit 9405281

Please sign in to comment.