diff --git a/internal/pipeline/acct.go b/internal/pipeline/acct.go index 2c9aac1..15726ac 100644 --- a/internal/pipeline/acct.go +++ b/internal/pipeline/acct.go @@ -101,9 +101,6 @@ func (p *Pipeline) startAcct() error { return errors.Wrap(err, "starting probe") } - // Watch the accounting probe for errors. - go p.acctErrorWorker() - log.Info("Started accounting probe and workers") return nil @@ -163,20 +160,3 @@ func (p *Pipeline) acctDestroyWorker() { p.acctSinkMu.RUnlock() } } - -// acctErrorWorker reads from acctProbe's error channel and terminates the -// program when an error occurs. -func (p *Pipeline) acctErrorWorker() { - - errs := p.acctProbe.ErrChan() - - for { - err, ok := <-errs - if !ok { - log.Debug("Pipeline's error event channel closed, stopping worker.") - break - } - - log.Fatalln("Fatal error in acctProbe:", err) - } -} diff --git a/pkg/bpf/errors.go b/pkg/bpf/errors.go index e20f309..0dc12ae 100644 --- a/pkg/bpf/errors.go +++ b/pkg/bpf/errors.go @@ -3,7 +3,6 @@ package bpf import "errors" const ( - errFmtSplitKprobe = "expected string of format 'k(ret)probe/': %s" errFmtSymNotFound = "kernel symbol '%s' not found, conntrack kernel module not loaded" errKernelRelease = "invalid kernel release version '%s'" ) diff --git a/pkg/bpf/integration_test.go b/pkg/bpf/integration_test.go index 7018d66..dbe19fa 100644 --- a/pkg/bpf/integration_test.go +++ b/pkg/bpf/integration_test.go @@ -76,7 +76,6 @@ func TestMain(m *testing.M) { if err := acctProbe.Start(); err != nil { log.Fatal(err) } - go errWorker(acctProbe.ErrChan()) // Run tests, save the return code. rc := m.Run() @@ -325,14 +324,6 @@ func filterWorker(in <-chan Event, out chan<- Event, f func(Event) bool) { } } -// errWorker listens for errors on the Probe's error channel. -// Terminates the test suite when an error occurs. -func errWorker(ec <-chan error) { - for err := range ec { - log.Fatal("unexpected error from Probe:", err) - } -} - // readTimeout attempts a read from an Event channel, timing out // when a message wasn't read after ms milliseconds. func readTimeout(c <-chan Event, ms uint) (Event, error) { diff --git a/pkg/bpf/probe.go b/pkg/bpf/probe.go index f8a1ba4..72bf4a0 100644 --- a/pkg/bpf/probe.go +++ b/pkg/bpf/probe.go @@ -57,9 +57,6 @@ type Probe struct { // Channel for receiving IDs of lost perf events. lost chan uint64 - // perfWorker error channel. - errs chan error - // Started status of the probe. startMu sync.Mutex started bool @@ -274,7 +271,6 @@ func (ap *Probe) Start() error { } ap.lost = make(chan uint64) - ap.errs = make(chan error) // Set up Readers for reading events from the perf ring buffers. r, err := perf.NewReader(ap.collection.Maps[perfUpdateMap], 4096) @@ -318,7 +314,6 @@ func (ap *Probe) Stop() error { } close(ap.lost) - close(ap.errs) if err := ap.disablePerfEvents(); err != nil { return err @@ -338,32 +333,11 @@ func (ap *Probe) Kernel() kernel.Kernel { return ap.kernel } -// ErrChan returns an initialized Probe's unbuffered error channel. -// The error channel is unbuffered because it doesn't make sense to have -// stale error data. If there is no ready consumer on the channel, errors -// are dropped. -// Returns nil if the Probe has not been Start()ed yet. -func (ap *Probe) ErrChan() chan error { - return ap.errs -} - // Stats returns a snapshot copy of the Probe's statistics. func (ap *Probe) Stats() ProbeStats { return ap.stats.Get() } -// sendError safely sends a message on the Probe's unbuffered errChan. -// If there is no ready channel receiver, sendError is a no-op. A return value -// of true means the error was successfully sent on the channel. -func (ap *Probe) sendError(err error) bool { - select { - case ap.errs <- err: - return true - default: - return false - } -} - // updateWorker reads binady flow update events from the Probe's ring buffer, // unmarshals the events into Event structures and sends them on all registered // consumers' event channels. @@ -389,8 +363,7 @@ func (ap *Probe) updateWorker() { var ae Event if err := ae.unmarshalBinary(rec.RawSample); err != nil { - ap.sendError(errors.Wrap(err, "error unmarshaling Event byte array")) - continue + panic(err) } // Fan out update event to all registered consumers. @@ -423,8 +396,7 @@ func (ap *Probe) destroyWorker() { var ae Event if err := ae.unmarshalBinary(rec.RawSample); err != nil { - ap.sendError(errors.Wrap(err, "error unmarshaling Event byte array")) - continue + panic(err) } // Fan out destroy event to all registered consumers.