Skip to content

Commit

Permalink
Use a message copy to apply fieldFilters in exec events
Browse files Browse the repository at this point in the history
As we keep them in the process cache, removing fields may affect the
correctness of the process cache. In order to fix that, we create a copy
of the event and apply fieldFilters only on the copy.

FIXES: #1428

Signed-off-by: Anastasios Papagiannis <tasos.papagiannnis@gmail.com>
  • Loading branch information
tpapagian committed Sep 5, 2023
1 parent cc75495 commit 0d85e0d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
15 changes: 12 additions & 3 deletions cmd/tetra/getevents/io_reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient.
// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads FGS events
// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads Tetragon events
type ioReaderClient struct {
scanner *bufio.Scanner
allowlist hubbleFilters.FilterFuncs
Expand Down Expand Up @@ -102,10 +103,18 @@ func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) {
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) {
continue
}
filterEvent := &res
if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse)
}
for _, filter := range i.fieldFilters {
filter.Filter(&res)
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
}
return &res, nil
return filterEvent, nil
}
if err := i.scanner.Err(); err != nil {
return nil, err
Expand Down
15 changes: 12 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type Listener interface {
Expand Down Expand Up @@ -160,22 +161,30 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon

// Filter the GetEventsResponse fields
filters := filters.FieldFiltersFromGetEventsRequest(request)
filterEvent := event
if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse)
}
for _, filter := range filters {
filter.Filter(event)
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
}

if aggregator != nil {
// Send event to aggregator.
select {
case aggregator.GetEventChannel() <- event:
case aggregator.GetEventChannel() <- filterEvent:
default:
logger.GetLogger().
WithField("request", request).
Warn("Aggregator buffer is full. Consider increasing AggregatorOptions.channel_buffer_size.")
}
} else {
// No need to aggregate. Directly send out the response.
if err = server.Send(event); err != nil {
if err = server.Send(filterEvent); err != nil {
s.ctxCleanupWG.Done()
return err
}
Expand Down

0 comments on commit 0d85e0d

Please sign in to comment.