Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tetragon: Add buffer between perf reader and events processing code #593

Merged
merged 2 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/tetragon/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (

keyRBSize = "rb-size"
keyRBSizeTotal = "rb-size-total"
keyRBQueueSize = "rb-queue-size"

keyEventQueueSize = "event-queue-size"

Expand Down Expand Up @@ -102,6 +103,7 @@ func readAndSetFlags() {

option.Config.RBSize = viper.GetInt(keyRBSize)
option.Config.RBSizeTotal = viper.GetInt(keyRBSizeTotal)
option.Config.RBQueueSize = viper.GetInt(keyRBQueueSize)

option.Config.GopsAddr = viper.GetString(keyGopsAddr)

Expand Down
2 changes: 2 additions & 0 deletions cmd/tetragon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,8 @@ func execute() error {

flags.StringSlice(keyKmods, []string{}, "List of kernel modules to load symbols from")

flags.Int(keyRBQueueSize, 65535, "Set size of channel between ring buffer and sensor go routines (default 65k)")

viper.BindPFlags(flags)
return rootCmd.Execute()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/config/initmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
pfmetrics "github.com/cilium/tetragon/pkg/metrics/policyfilter"
"github.com/cilium/tetragon/pkg/metrics/processexecmetrics"
"github.com/cilium/tetragon/pkg/metrics/ringbufmetrics"
"github.com/cilium/tetragon/pkg/metrics/ringbufqueuemetrics"
"github.com/cilium/tetragon/pkg/metrics/syscallmetrics"
"github.com/cilium/tetragon/pkg/metrics/watchermetrics"
"github.com/cilium/tetragon/pkg/observer"
Expand All @@ -30,6 +31,7 @@ func InitAllMetrics(registry *prometheus.Registry) {
pfmetrics.InitMetrics(registry)
processexecmetrics.InitMetrics(registry)
ringbufmetrics.InitMetrics(registry)
ringbufqueuemetrics.InitMetrics(registry)
syscallmetrics.InitMetrics(registry)
watchermetrics.InitMetrics(registry)
observer.InitMetrics(registry)
Expand Down
29 changes: 29 additions & 0 deletions pkg/metrics/ringbufqueuemetrics/ringbufqueuemetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Tetragon

package ringbufqueuemetrics

import (
"github.com/cilium/tetragon/pkg/metrics/consts"
"github.com/prometheus/client_golang/prometheus"
)

var (
Received = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: consts.MetricsNamespace,
Name: "ringbuf_queue_received_total",
Help: "The total number of Tetragon events ring buffer queue received.",
ConstLabels: nil,
})
Lost = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: consts.MetricsNamespace,
Name: "ringbuf_queue_lost_total",
Help: "The total number of Tetragon events ring buffer queue lost.",
ConstLabels: nil,
})
)

func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(Received)
registry.MustRegister(Lost)
}
41 changes: 38 additions & 3 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cilium/tetragon/pkg/metrics/errormetrics"
"github.com/cilium/tetragon/pkg/metrics/opcodemetrics"
"github.com/cilium/tetragon/pkg/metrics/ringbufmetrics"
"github.com/cilium/tetragon/pkg/metrics/ringbufqueuemetrics"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/cilium/tetragon/pkg/sensors"
Expand Down Expand Up @@ -126,7 +127,6 @@ func (k *Observer) receiveEvent(data []byte) {
if option.Config.EnableMsgHandlingLatency {
timer = time.Now()
}
atomic.AddUint64(&k.recvCntr, 1)

op, events, err := HandlePerfData(data)
opcodemetrics.OpTotalInc(int(op))
Expand Down Expand Up @@ -186,6 +186,15 @@ func (k *Observer) getRBSize(cpus int) int {
return size
}

func (k *Observer) getRBQueueSize() int {
size := option.Config.RBQueueSize
if size == 0 {
size = 65535
}
k.log.WithField("size", size).Info("Perf ring buffer events queue size (events)")
return size
}

func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
pinOpts := ebpf.LoadPinOptions{}
perfMap, err := ebpf.LoadPinnedMap(k.PerfConfig.MapName, &pinOpts)
Expand All @@ -205,6 +214,10 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
k.observerListeners(&readyapi.MsgTetragonReady{})
ready()

// We spawn go routine to read and process perf events,
// connected with main app through eventsQueue channel.
eventsQueue := make(chan *perf.Record, k.getRBQueueSize())

// Listeners are ready and about to start reading from perf reader, tell
// user everything is ready.
k.log.Info("Listening for events...")
Expand All @@ -226,7 +239,13 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
}
} else {
if len(record.RawSample) > 0 {
k.receiveEvent(record.RawSample)
select {
case eventsQueue <- &record:
default:
// eventsQueue channel is full, drop the event
kkourt marked this conversation as resolved.
Show resolved Hide resolved
ringbufqueuemetrics.Lost.Inc()
}
k.recvCntr++
ringbufmetrics.PerfEventReceived.Inc()
}

Expand All @@ -236,7 +255,23 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
}
}
}
k.log.WithError(stopCtx.Err()).Info("Listening for events completed.")
}()

// Start processing records from perf.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event := <-eventsQueue:
k.receiveEvent(event.RawSample)
ringbufqueuemetrics.Received.Inc()
case <-stopCtx.Done():
k.log.WithError(stopCtx.Err()).Infof("Listening for events completed.")
k.log.Debugf("Unprocessed events in RB queue: %d", len(eventsQueue))
return
}
}
}()

// Loading default program consumes some memory lets kick GC to give
Expand Down
1 change: 1 addition & 0 deletions pkg/option/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type config struct {

RBSize int
RBSizeTotal int
RBQueueSize int

ProcessCacheSize int
DataCacheSize int
Expand Down