From 0d85e0d8b09046c9910bb0c9024205dc7f1a9d08 Mon Sep 17 00:00:00 2001 From: Anastasios Papagiannis Date: Tue, 5 Sep 2023 11:10:35 +0000 Subject: [PATCH] Use a message copy to apply fieldFilters in exec events As we keep them in the process cache, removing fields may affect the correctness of the process cache. In order to fix that, we create a copy of the event and apply fieldFilters only on the copy. FIXES: https://github.com/cilium/tetragon/issues/1428 Signed-off-by: Anastasios Papagiannis --- cmd/tetra/getevents/io_reader_client.go | 15 ++++++++++++--- pkg/server/server.go | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/cmd/tetra/getevents/io_reader_client.go b/cmd/tetra/getevents/io_reader_client.go index ca5e8af8826..2e910479f53 100644 --- a/cmd/tetra/getevents/io_reader_client.go +++ b/cmd/tetra/getevents/io_reader_client.go @@ -16,10 +16,11 @@ import ( hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) // ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient. -// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads FGS events +// ioReaderObserver implements tetragon.FineGuidanceSensorsClient interface. It reads Tetragon events type ioReaderClient struct { scanner *bufio.Scanner allowlist hubbleFilters.FilterFuncs @@ -102,10 +103,18 @@ func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) { if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) { continue } + filterEvent := &res + if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters + // We need a copy of the exec event as modifing the original message + // can cause issues in the process cache (we keep a copy of that message there). + filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse) + } for _, filter := range i.fieldFilters { - filter.Filter(&res) + // we need not to change res + // maybe only for exec events + filter.Filter(filterEvent) } - return &res, nil + return filterEvent, nil } if err := i.scanner.Err(); err != nil { return nil, err diff --git a/pkg/server/server.go b/pkg/server/server.go index 9355f2ef274..b1113d3944c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/cilium/tetragon/pkg/version" "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" ) type Listener interface { @@ -160,14 +161,22 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon // Filter the GetEventsResponse fields filters := filters.FieldFiltersFromGetEventsRequest(request) + filterEvent := event + if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters + // We need a copy of the exec event as modifing the original message + // can cause issues in the process cache (we keep a copy of that message there). + filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse) + } for _, filter := range filters { - filter.Filter(event) + // we need not to change res + // maybe only for exec events + filter.Filter(filterEvent) } if aggregator != nil { // Send event to aggregator. select { - case aggregator.GetEventChannel() <- event: + case aggregator.GetEventChannel() <- filterEvent: default: logger.GetLogger(). WithField("request", request). @@ -175,7 +184,7 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon } } else { // No need to aggregate. Directly send out the response. - if err = server.Send(event); err != nil { + if err = server.Send(filterEvent); err != nil { s.ctxCleanupWG.Done() return err }