Skip to content

Commit

Permalink
filters/fields: do a deep copy before filtering
Browse files Browse the repository at this point in the history
We need to deep copy the event here to avoid issues caused by filtering out information
that is shared between events through the event cache (e.g. process info). This can cause
segmentation faults and other nasty bugs. Avoid all that by doing a deep copy here before
filtering. This is not pretty or great for performance, but it at least works as a stopgap
until we're able to refactor things so that it's no longer necessary.

Signed-off-by: William Findlay <will@isovalent.com>
  • Loading branch information
willfindlay committed Nov 6, 2023
1 parent a3dba71 commit 09de1f8
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 73 deletions.
22 changes: 8 additions & 14 deletions cmd/tetra/getevents/io_reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient.
Expand Down Expand Up @@ -101,28 +100,23 @@ func (i *ioReaderClient) GetVersion(_ context.Context, _ *tetragon.GetVersionReq

func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) {
for i.scanner.Scan() {
var res tetragon.GetEventsResponse
res := &tetragon.GetEventsResponse{}
line := i.scanner.Bytes()
err := i.unmarshaller.Unmarshal(line, &res)
err := i.unmarshaller.Unmarshal(line, res)
if err != nil && i.debug {
fmt.Fprintf(os.Stderr, "DEBUG: failed unmarshal: %s: %s\n", line, err)
continue
}
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) {
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: res}) {
continue
}
filterEvent := &res
if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse)
}
for _, filter := range i.fieldFilters {
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
res, err = filter.Filter(res)
if err != nil {
return nil, err
}
}
return filterEvent, nil
return res, nil
}
if err := i.scanner.Err(); err != nil {
return nil, err
Expand Down
57 changes: 15 additions & 42 deletions pkg/filters/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/mennanov/fmutils"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

Expand Down Expand Up @@ -81,48 +82,15 @@ type FieldFilter struct {
fields fmutils.NestedMask
action tetragon.FieldFilterAction
invertEventSet bool
needsCopy map[tetragon.EventType]struct{}
}

// NewFieldFilter constructs a new FieldFilter from a set of fields.
func NewFieldFilter(eventSet []tetragon.EventType, fields []string, action tetragon.FieldFilterAction, invertEventSet bool) *FieldFilter {
// We only need to copy exec and exit events when they are explicitly filtering out
// the PID. This is because we need the PID to not be nil when accessing the event
// later on from the eventcache. See additional comments below for details.
var maybeNeedsCopy bool
if action == tetragon.FieldFilterAction_EXCLUDE {
for _, field := range fields {
if strings.HasPrefix(field, "process") {
maybeNeedsCopy = true
break
}
}
} else if action == tetragon.FieldFilterAction_INCLUDE {
// For inclusion, it's the opposite situation from the above. If the process.pid
// field is NOT present, it will be trimmed. So assume we need a copy unless we
// see process.pid.
maybeNeedsCopy = true
for _, field := range fields {
if field == "process.pid" {
maybeNeedsCopy = false
break
}
}
}

needsCopy := make(map[tetragon.EventType]struct{})
if maybeNeedsCopy {
for _, t := range eventSet {
needsCopy[t] = struct{}{}
}
}

return &FieldFilter{
eventSet: eventSet,
fields: fmutils.NestedMaskFromPaths(fields),
action: action,
invertEventSet: invertEventSet,
needsCopy: needsCopy,
}
}

Expand Down Expand Up @@ -167,15 +135,20 @@ func FieldFiltersFromGetEventsRequest(request *tetragon.GetEventsRequest) []*Fie
return filters
}

func (f *FieldFilter) NeedsCopy(ev *tetragon.GetEventsResponse) bool {
_, ok := f.needsCopy[ev.EventType()]
return ok
}

// Filter filters the fields in the GetEventsResponse, keeping fields specified in the
// inclusion filter and discarding fields specified in the exclusion filter. Exclusion
// takes precedence over inclusion and an empty filter set will keep all remaining fields.
func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) (*tetragon.GetEventsResponse, error) {
// We need to deep copy the event here to avoid issues caused by filtering out
// information that is shared between events through the event cache (e.g. process
// info). This can cause segmentation faults and other nasty bugs. Avoid all that by
// doing a deep copy here before filtering.
//
// FIXME: We need to fix this so that it doesn't kill performance by doing a deep
// copy. This will require architectural changes to both the field filters and the
// event cache.
event = proto.Clone(event).(*tetragon.GetEventsResponse)

if len(f.eventSet) > 0 {
// skip filtering by default unless the event set is inverted, in which case we
// want to filter by default and skip only if we have a match
Expand All @@ -201,7 +174,7 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
}

if skipFiltering {
return nil
return event, nil
}
}

Expand All @@ -221,8 +194,8 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
})

if !rft.IsValid() {
return fmt.Errorf("invalid event after field filter")
return nil, fmt.Errorf("invalid event after field filter")
}

return nil
return event, nil
}
17 changes: 9 additions & 8 deletions pkg/filters/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestEventFieldFilters(t *testing.T) {
// Construct the filter
filters := FieldFiltersFromGetEventsRequest(request)
for _, filter := range filters {
filter.Filter(ev)
ev, _ = filter.Filter(ev)
}

// These fields should all have been included and so should not be empty
Expand Down Expand Up @@ -125,12 +125,12 @@ func TestFieldFilterByEventType(t *testing.T) {
}

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXIT}, []string{"process.pid"}, false)
filter.Filter(ev)
ev, _ = filter.Filter(ev)

assert.NotEmpty(t, ev.GetProcessExec().Process.Pid)

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process.pid"}, false)
filter.Filter(ev)
ev, _ = filter.Filter(ev)

assert.Empty(t, ev.GetProcessExec().Process.Pid)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestEmptyFieldFilter(t *testing.T) {
}

assert.True(t, proto.Equal(ev, expected), "events are equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
}

Expand All @@ -250,7 +250,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process", "parent"}, true)
assert.True(t, proto.Equal(ev, expected), "events are equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")

ev = &tetragon.GetEventsResponse{
Expand All @@ -270,7 +270,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_KPROBE}, []string{"process", "parent"}, true)
assert.False(t, proto.Equal(ev, expected), "events are not equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
}

Expand Down Expand Up @@ -599,8 +599,9 @@ func TestSlimExecEventsFieldFilterExample(t *testing.T) {
}

for _, filter := range filters {
for _, ev := range evs {
filter.Filter(ev)
for i, ev := range evs {
ev, _ = filter.Filter(ev)
evs[i] = ev
}
}
for i := range evs {
Expand Down
16 changes: 7 additions & 9 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type Listener interface {
Expand Down Expand Up @@ -165,29 +164,28 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon

// Filter the GetEventsResponse fields
filters := filters.FieldFiltersFromGetEventsRequest(request)
filterEvent := event

for _, filter := range filters {
// We need a copy of the exec or exit event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
if filter.NeedsCopy(filterEvent) && filterEvent == event {
filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse)
ev, err := filter.Filter(event)
if err != nil {
logger.GetLogger().WithField("filter", filter).WithError(err).Warn("Failed to apply field filter")
continue
}
filter.Filter(filterEvent)
event = ev
}

if aggregator != nil {
// Send event to aggregator.
select {
case aggregator.GetEventChannel() <- filterEvent:
case aggregator.GetEventChannel() <- event:
default:
logger.GetLogger().
WithField("request", request).
Warn("Aggregator buffer is full. Consider increasing AggregatorOptions.channel_buffer_size.")
}
} else {
// No need to aggregate. Directly send out the response.
if err = server.Send(filterEvent); err != nil {
if err = server.Send(event); err != nil {
s.ctxCleanupWG.Done()
return err
}
Expand Down

0 comments on commit 09de1f8

Please sign in to comment.