Skip to content

Commit

Permalink
tracing: support custom event handlers
Browse files Browse the repository at this point in the history
Add support for policies to define their own custom event handlers.

Signed-off-by: Kornilios Kourtis <kornilios@isovalent.com>
  • Loading branch information
kkourt committed Oct 16, 2023
1 parent 66c641b commit a57fe77
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 22 deletions.
27 changes: 27 additions & 0 deletions pkg/eventhandler/eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Tetragon

package eventhandler

import (
"github.com/cilium/tetragon/pkg/observer"
)

type Handler func([]observer.Event, error) ([]observer.Event, error)

// CustomEventhandler allows components to define their custom event handling.
// This is intended for policies to:
// - map events / and errors from tracing sensors (e.g., kprobe or tracepoints)
// - generate custom metrics
//
// Other use-cases might be served from this as well
type HasCustomHandler interface {
Handler() Handler
}

func GetCustomEventhandler(obj interface{}) Handler {
if ceh, ok := obj.(HasCustomHandler); ok {
return ceh.Handler()
}
return nil
}
29 changes: 20 additions & 9 deletions pkg/sensors/tracing/generickprobe.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cilium/tetragon/pkg/bpf"
"github.com/cilium/tetragon/pkg/btf"
cachedbtf "github.com/cilium/tetragon/pkg/btf"
"github.com/cilium/tetragon/pkg/eventhandler"
"github.com/cilium/tetragon/pkg/grpc/tracing"
"github.com/cilium/tetragon/pkg/idtable"
"github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1"
Expand Down Expand Up @@ -131,6 +132,8 @@ type genericKprobe struct {
// reference to a stack trace map, must be closed when unloading the kprobe,
// this is done in the sensor PostUnloadHook
stackTraceMapRef *ebpf.Map

customHandler eventhandler.Handler
}

// pendingEvent is an event waiting to be merged with another event.
Expand Down Expand Up @@ -459,10 +462,11 @@ func isLTOperator(op string) bool {
}

type addKprobeIn struct {
useMulti bool
sensorPath string
policyName string
policyID policyfilter.PolicyID
useMulti bool
sensorPath string
policyName string
policyID policyfilter.PolicyID
customHandler eventhandler.Handler
}

type addKprobeOut struct {
Expand Down Expand Up @@ -494,6 +498,7 @@ func createGenericKprobeSensor(
policyID policyfilter.PolicyID,
policyName string,
lists []v1alpha1.ListSpec,
customHandler eventhandler.Handler,
) (*sensors.Sensor, error) {
var progs []*program.Program
var maps []*program.Map
Expand All @@ -508,10 +513,11 @@ func createGenericKprobeSensor(
bpf.HasKprobeMulti()

in := addKprobeIn{
useMulti: useMulti,
sensorPath: name,
policyID: policyID,
policyName: policyName,
useMulti: useMulti,
sensorPath: name,
policyID: policyID,
policyName: policyName,
customHandler: customHandler,
}

addedKprobeIndices := []int{}
Expand Down Expand Up @@ -744,6 +750,7 @@ func addKprobe(funcName string, f *v1alpha1.KProbeSpec, in *addKprobeIn, selMaps
tableId: idtable.UninitializedEntryID,
policyName: in.policyName,
hasOverride: selectors.HasOverride(f),
customHandler: in.customHandler,
}

// Parse Filters into kernel filter logic
Expand Down Expand Up @@ -1148,7 +1155,11 @@ func handleGenericKprobe(r *bytes.Reader) ([]observer.Event, error) {
return nil, fmt.Errorf("Failed to match id")
}

return handleMsgGenericKprobe(&m, gk, r)
ret, err := handleMsgGenericKprobe(&m, gk, r)
if gk.customHandler != nil {
ret, err = gk.customHandler(ret, err)
}
return ret, err
}

func handleMsgGenericKprobe(m *api.MsgGenericKprobe, gk *genericKprobe, r *bytes.Reader) ([]observer.Event, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sensors/tracing/generickprobe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Test_SensorPostUnloadHook(t *testing.T) {
Call: "test_symbol",
Syscall: false,
},
}, 0, "test_policy", nil)
}, 0, "test_policy", nil, nil)
if err != nil {
t.Errorf("createGenericKprobeSensor err expected: nil, got: %s", err)
}
Expand Down
25 changes: 18 additions & 7 deletions pkg/sensors/tracing/generictracepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cilium/tetragon/pkg/api/ops"
"github.com/cilium/tetragon/pkg/api/tracingapi"
api "github.com/cilium/tetragon/pkg/api/tracingapi"
"github.com/cilium/tetragon/pkg/eventhandler"
"github.com/cilium/tetragon/pkg/grpc/tracing"
"github.com/cilium/tetragon/pkg/idtable"
"github.com/cilium/tetragon/pkg/k8s/apis/cilium.io/v1alpha1"
Expand Down Expand Up @@ -84,6 +85,9 @@ type genericTracepoint struct {

// parsed kernel selector state
selectors *selectors.KernelSelectorState

// custom event handler
customHandler eventhandler.Handler
}

// genericTracepointArg is the internal representation of an output value of a
Expand Down Expand Up @@ -317,6 +321,7 @@ func createGenericTracepoint(
conf *GenericTracepointConf,
policyID policyfilter.PolicyID,
policyName string,
customHandler eventhandler.Handler,
) (*genericTracepoint, error) {
tp := tracepoint.Tracepoint{
Subsys: conf.Subsystem,
Expand All @@ -333,11 +338,12 @@ func createGenericTracepoint(
}

ret := &genericTracepoint{
Info: &tp,
Spec: conf,
args: tpArgs,
policyID: policyID,
policyName: policyName,
Info: &tp,
Spec: conf,
args: tpArgs,
policyID: policyID,
policyName: policyName,
customHandler: customHandler,
}

genericTracepointTable.addTracepoint(ret)
Expand All @@ -352,11 +358,12 @@ func createGenericTracepointSensor(
policyID policyfilter.PolicyID,
policyName string,
lists []v1alpha1.ListSpec,
customHandler eventhandler.Handler,
) (*sensors.Sensor, error) {

tracepoints := make([]*genericTracepoint, 0, len(confs))
for i := range confs {
tp, err := createGenericTracepoint(name, &confs[i], policyID, policyName)
tp, err := createGenericTracepoint(name, &confs[i], policyID, policyName, customHandler)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -631,7 +638,11 @@ func handleGenericTracepoint(r *bytes.Reader) ([]observer.Event, error) {
return []observer.Event{unix}, nil
}

return handleMsgGenericTracepoint(&m, unix, tp, r)
ret, err := handleMsgGenericTracepoint(&m, unix, tp, r)
if tp.customHandler != nil {
ret, err = tp.customHandler(ret, err)
}
return ret, err
}

func handleMsgGenericTracepoint(
Expand Down
7 changes: 5 additions & 2 deletions pkg/sensors/tracing/policyhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"sync/atomic"

"github.com/cilium/tetragon/pkg/eventhandler"
"github.com/cilium/tetragon/pkg/policyfilter"
"github.com/cilium/tetragon/pkg/sensors"
"github.com/cilium/tetragon/pkg/tracingpolicy"
Expand All @@ -29,17 +30,19 @@ func (h policyHandler) PolicyHandler(
if len(spec.KProbes) > 0 && len(spec.Tracepoints) > 0 {
return nil, errors.New("tracing policies with both kprobes and tracepoints are not currently supported")
}

handler := eventhandler.GetCustomEventhandler(policy)
if len(spec.KProbes) > 0 {
name := fmt.Sprintf("gkp-sensor-%d", atomic.AddUint64(&sensorCounter, 1))
err := preValidateKprobes(name, spec.KProbes, spec.Lists)
if err != nil {
return nil, fmt.Errorf("validation failed: %w", err)
}
return createGenericKprobeSensor(name, spec.KProbes, policyID, policyName, spec.Lists)
return createGenericKprobeSensor(name, spec.KProbes, policyID, policyName, spec.Lists, handler)
}
if len(spec.Tracepoints) > 0 {
name := fmt.Sprintf("gtp-sensor-%d", atomic.AddUint64(&sensorCounter, 1))
return createGenericTracepointSensor(name, spec.Tracepoints, policyID, policyName, spec.Lists)
return createGenericTracepointSensor(name, spec.Tracepoints, policyID, policyName, spec.Lists, handler)
}
return nil, nil
}
6 changes: 3 additions & 3 deletions pkg/sensors/tracing/tracepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestGenericTracepointSimple(t *testing.T) {
sm := tus.GetTestSensorManager(ctx, t)
// create and add sensor
sensor, err := createGenericTracepointSensor("GtpLseekTest", []GenericTracepointConf{lseekConf}, policyfilter.NoFilterID,
"policyName", []v1alpha1.ListSpec{})
"policyName", []v1alpha1.ListSpec{}, nil)
if err != nil {
t.Fatalf("failed to create generic tracepoint sensor: %s", err)
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func doTestGenericTracepointPidFilter(t *testing.T, conf GenericTracepointConf,
sm := tus.GetTestSensorManager(ctx, t)
// create and add sensor
sensor, err := createGenericTracepointSensor("GtpLseekTest", []GenericTracepointConf{conf}, policyfilter.NoFilterID,
"policyName", []v1alpha1.ListSpec{})
"policyName", []v1alpha1.ListSpec{}, nil)
if err != nil {
t.Fatalf("failed to create generic tracepoint sensor: %s", err)
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestTracepointCloneThreads(t *testing.T) {
sm := tus.GetTestSensorManager(ctx, t)
// create and add sensor
sensor, err := createGenericTracepointSensor("GtpLseekTest", []GenericTracepointConf{lseekConf}, policyfilter.NoFilterID,
"policyName", []v1alpha1.ListSpec{})
"policyName", []v1alpha1.ListSpec{}, nil)
if err != nil {
t.Fatalf("failed to create generic tracepoint sensor: %s", err)
}
Expand Down

0 comments on commit a57fe77

Please sign in to comment.