Skip to content

Commit

Permalink
tetragon: Add queue between perf reader and sensor processing code
Browse files Browse the repository at this point in the history
Adding extra layer between perf ring buffer reader and event
processing by sensor code.

Basically it's new channel (default size 65k) that is written by
perf ringbuffer reader and read by new go routine that processes
these events.

This way we keep the perf reader fast and we won't get lost events
if sensor processing code have a hiccup and takes longer.

When the eventsQueue channel is full we drop the event. It's configurable
by new --events-queue-size option, the default is 65k.

Signed-off-by: Jiri Olsa <jolsa@kernel.org>
  • Loading branch information
olsajiri committed Aug 17, 2023
1 parent f59955a commit 652ba61
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 3 deletions.
4 changes: 4 additions & 0 deletions cmd/tetragon/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (
keyRBSize = "rb-size"
keyRBSizeTotal = "rb-size-total"

keyEventsQueueSize = "events-queue-size"

keyEventQueueSize = "event-queue-size"

keyReleasePinnedBPF = "release-pinned-bpf"
Expand Down Expand Up @@ -103,6 +105,8 @@ func readAndSetFlags() {
option.Config.RBSize = viper.GetInt(keyRBSize)
option.Config.RBSizeTotal = viper.GetInt(keyRBSizeTotal)

option.Config.EventsQueueSize = viper.GetInt(keyEventsQueueSize)

option.Config.GopsAddr = viper.GetString(keyGopsAddr)

logLevel := viper.GetString(keyLogLevel)
Expand Down
2 changes: 2 additions & 0 deletions cmd/tetragon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,8 @@ func execute() error {

flags.StringSlice(keyKmods, []string{}, "List of kernel modules to load symbols from")

flags.Int(keyEventsQueueSize, 65535, "Set size of channel between ring buffer and sensor go routines (default 65k)")

viper.BindPFlags(flags)
return rootCmd.Execute()
}
Expand Down
38 changes: 35 additions & 3 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (k *Observer) receiveEvent(data []byte) {
if option.Config.EnableMsgHandlingLatency {
timer = time.Now()
}
atomic.AddUint64(&k.recvCntr, 1)

op, events, err := HandlePerfData(data)
opcodemetrics.OpTotalInc(int(op))
Expand Down Expand Up @@ -186,6 +185,15 @@ func (k *Observer) getRBSize(cpus int) int {
return size
}

func (k *Observer) getEventsQueueSize() int {
size := option.Config.EventsQueueSize
if size == 0 {
size = 65535
}
k.log.WithField("size", size).Info("Events queue size (events)")
return size
}

func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
pinOpts := ebpf.LoadPinOptions{}
perfMap, err := ebpf.LoadPinnedMap(k.PerfConfig.MapName, &pinOpts)
Expand All @@ -205,6 +213,10 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
k.observerListeners(&readyapi.MsgTetragonReady{})
ready()

// We spawn go routine to read and process perf events,
// connected with main app through eventsQueue channel.
eventsQueue := make(chan *perf.Record, k.getEventsQueueSize())

// Listeners are ready and about to start reading from perf reader, tell
// user everything is ready.
k.log.Info("Listening for events...")
Expand All @@ -226,7 +238,12 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
}
} else {
if len(record.RawSample) > 0 {
k.receiveEvent(record.RawSample)
select {
case eventsQueue <- &record:
default:
// eventsQueue channel is full, drop the event
}
k.recvCntr++
ringbufmetrics.PerfEventReceived.Inc()
}

Expand All @@ -236,7 +253,22 @@ func (k *Observer) RunEvents(stopCtx context.Context, ready func()) error {
}
}
}
k.log.WithError(stopCtx.Err()).Info("Listening for events completed.")
}()

// Start processing records from perf.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event := <-eventsQueue:
k.receiveEvent(event.RawSample)
case <-stopCtx.Done():
k.log.WithError(stopCtx.Err()).Infof("Listening for events completed.")
k.log.Debugf("Unprocessed events in RB queue: %d", len(eventsQueue))
return
}
}
}()

// Loading default program consumes some memory lets kick GC to give
Expand Down
2 changes: 2 additions & 0 deletions pkg/option/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type config struct {
MemProfile string
PprofAddr string

EventsQueueSize int

EventQueueSize uint

ReleasePinned bool
Expand Down

0 comments on commit 652ba61

Please sign in to comment.