diff --git a/internal/epoll/poller.go b/internal/epoll/poller.go index 2235553b5..ed1c3a3c8 100644 --- a/internal/epoll/poller.go +++ b/internal/epoll/poller.go @@ -1,10 +1,12 @@ package epoll import ( + "errors" "fmt" "math" "os" "runtime" + "slices" "sync" "time" @@ -12,6 +14,8 @@ import ( "github.com/cilium/ebpf/internal/unix" ) +var ErrFlushed = errors.New("data was flushed") + // Poller waits for readiness notifications from multiple file descriptors. // // The wait can be interrupted by calling Close. @@ -21,27 +25,48 @@ type Poller struct { epollMu sync.Mutex epollFd int - eventMu sync.Mutex - event *eventFd + eventMu sync.Mutex + closeEvent *eventFd + flushEvent *eventFd } -func New() (*Poller, error) { +func New() (_ *Poller, err error) { + closeFDOnError := func(fd int) { + if err != nil { + unix.Close(fd) + } + } + closeEventFDOnError := func(e *eventFd) { + if err != nil { + e.close() + } + } + epollFd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, fmt.Errorf("create epoll fd: %v", err) } + defer closeFDOnError(epollFd) p := &Poller{epollFd: epollFd} - p.event, err = newEventFd() + p.closeEvent, err = newEventFd() + if err != nil { + return nil, err + } + defer closeEventFDOnError(p.closeEvent) + + p.flushEvent, err = newEventFd() if err != nil { - unix.Close(epollFd) return nil, err } + defer closeEventFDOnError(p.flushEvent) + + if err := p.Add(p.closeEvent.raw, 0); err != nil { + return nil, fmt.Errorf("add close eventfd: %w", err) + } - if err := p.Add(p.event.raw, 0); err != nil { - unix.Close(epollFd) - p.event.close() - return nil, fmt.Errorf("add eventfd: %w", err) + if err := p.Add(p.flushEvent.raw, 0); err != nil { + return nil, fmt.Errorf("add flush eventfd: %w", err) } runtime.SetFinalizer(p, (*Poller).Close) @@ -55,8 +80,8 @@ func New() (*Poller, error) { func (p *Poller) Close() error { runtime.SetFinalizer(p, nil) - // Interrupt Wait() via the event fd if it's currently blocked. - if err := p.wakeWait(); err != nil { + // Interrupt Wait() via the closeEvent fd if it's currently blocked. + if err := p.wakeWaitForClose(); err != nil { return err } @@ -73,9 +98,14 @@ func (p *Poller) Close() error { p.epollFd = -1 } - if p.event != nil { - p.event.close() - p.event = nil + if p.closeEvent != nil { + p.closeEvent.close() + p.closeEvent = nil + } + + if p.flushEvent != nil { + p.flushEvent.close() + p.flushEvent = nil } return nil @@ -118,8 +148,11 @@ func (p *Poller) Add(fd int, id int) error { // Wait for events. // -// Returns the number of pending events or an error wrapping os.ErrClosed if -// Close is called, or os.ErrDeadlineExceeded if EpollWait timeout. +// Returns the number of pending events and any errors. +// +// - [os.ErrClosed] if interrupted by [Close]. +// - [ErrFlushed] if interrupted by [Flush]. +// - [os.ErrDeadlineExceeded] if deadline is reached. func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) { p.epollMu.Lock() defer p.epollMu.Unlock() @@ -154,16 +187,26 @@ func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) return 0, fmt.Errorf("epoll wait: %w", os.ErrDeadlineExceeded) } - for _, event := range events[:n] { - if int(event.Fd) == p.event.raw { - // Since we don't read p.event the event is never cleared and + for i := 0; i < n; { + event := events[i] + if int(event.Fd) == p.closeEvent.raw { + // Since we don't read p.closeEvent the event is never cleared and // we'll keep getting this wakeup until Close() acquires the // lock and sets p.epollFd = -1. return 0, fmt.Errorf("epoll wait: %w", os.ErrClosed) } + if int(event.Fd) == p.flushEvent.raw { + // read event to prevent it from continuing to wake + p.flushEvent.read() + err = ErrFlushed + events = slices.Delete(events, i, i+1) + n -= 1 + continue + } + i++ } - return n, nil + return n, err } } @@ -171,16 +214,28 @@ type temporaryError interface { Temporary() bool } -// wakeWait unblocks Wait if it's epoll_wait. -func (p *Poller) wakeWait() error { +// wakeWaitForClose unblocks Wait if it's epoll_wait. +func (p *Poller) wakeWaitForClose() error { + p.eventMu.Lock() + defer p.eventMu.Unlock() + + if p.closeEvent == nil { + return fmt.Errorf("epoll wake: %w", os.ErrClosed) + } + + return p.closeEvent.add(1) +} + +// Flush unblocks Wait if it's epoll_wait, for purposes of reading pending samples +func (p *Poller) Flush() error { p.eventMu.Lock() defer p.eventMu.Unlock() - if p.event == nil { + if p.flushEvent == nil { return fmt.Errorf("epoll wake: %w", os.ErrClosed) } - return p.event.add(1) + return p.flushEvent.add(1) } // eventFd wraps a Linux eventfd. diff --git a/internal/epoll/poller_test.go b/internal/epoll/poller_test.go index 3641fb141..98f949419 100644 --- a/internal/epoll/poller_test.go +++ b/internal/epoll/poller_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/cilium/ebpf/internal/unix" + "github.com/go-quicktest/qt" ) func TestPoller(t *testing.T) { @@ -101,12 +102,33 @@ func TestPollerDeadline(t *testing.T) { }() // Wait for the goroutine to enter the syscall. - time.Sleep(time.Second) + time.Sleep(500 * time.Microsecond) poller.Close() <-done } +func TestPollerFlush(t *testing.T) { + t.Parallel() + + _, poller := mustNewPoller(t) + events := make([]unix.EpollEvent, 1) + + done := make(chan struct{}) + go func() { + defer close(done) + + _, err := poller.Wait(events, time.Time{}) + qt.Check(t, qt.ErrorIs(err, ErrFlushed)) + }() + + // Wait for the goroutine to enter the syscall. + time.Sleep(500 * time.Microsecond) + + poller.Flush() + <-done +} + func mustNewPoller(t *testing.T) (*eventFd, *Poller) { t.Helper() diff --git a/perf/reader.go b/perf/reader.go index 51ad6ced5..4d2257a1a 100644 --- a/perf/reader.go +++ b/perf/reader.go @@ -18,8 +18,9 @@ import ( ) var ( - ErrClosed = os.ErrClosed - errEOR = errors.New("end of ring") + ErrClosed = os.ErrClosed + ErrFlushed = epoll.ErrFlushed + errEOR = errors.New("end of ring") ) var perfEventHeaderSize = binary.Size(perfEventHeader{}) @@ -160,6 +161,8 @@ type Reader struct { overwritable bool bufferSize int + + pendingErr error } // ReaderOptions control the behaviour of the user @@ -318,18 +321,18 @@ func (pr *Reader) SetDeadline(t time.Time) { // Read the next record from the perf ring buffer. // -// The function blocks until there are at least Watermark bytes in one +// The method blocks until there are at least Watermark bytes in one // of the per CPU buffers. Records from buffers below the Watermark // are not returned. // // Records can contain between 0 and 7 bytes of trailing garbage from the ring // depending on the input sample's length. // -// Calling Close interrupts the function. +// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush] +// makes it return all records currently in the ring buffer, followed by [ErrFlushed]. // -// Returns [os.ErrDeadlineExceeded] if a deadline was set and the perf ring buffer -// was empty. Otherwise returns a record and no error, even if the deadline was -// exceeded. +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. // // See [Reader.ReadInto] for a more efficient version of this method. func (pr *Reader) Read() (Record, error) { @@ -356,13 +359,13 @@ func (pr *Reader) ReadInto(rec *Record) error { return fmt.Errorf("perf ringbuffer: %w", ErrClosed) } - deadlineWasExceeded := false for { if len(pr.epollRings) == 0 { - if deadlineWasExceeded { - // All rings were empty when the deadline expired, return + if pe := pr.pendingErr; pe != nil { + // All rings have been emptied since the error occurred, return // appropriate error. - return os.ErrDeadlineExceeded + pr.pendingErr = nil + return pe } // NB: The deferred pauseMu.Unlock will panic if Wait panics, which @@ -371,10 +374,10 @@ func (pr *Reader) ReadInto(rec *Record) error { _, err := pr.poller.Wait(pr.epollEvents, pr.deadline) pr.pauseMu.Lock() - if errors.Is(err, os.ErrDeadlineExceeded) { + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { // We've hit the deadline, check whether there is any data in // the rings that we've not been woken up for. - deadlineWasExceeded = true + pr.pendingErr = err } else if err != nil { return err } @@ -463,6 +466,12 @@ func (pr *Reader) BufferSize() int { return pr.bufferSize } +// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point, +// until you receive a [ErrFlushed] error. +func (pr *Reader) Flush() error { + return pr.poller.Flush() +} + // NB: Has to be preceded by a call to ring.loadHead. func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { defer ring.writeTail() diff --git a/perf/reader_test.go b/perf/reader_test.go index 39eac2653..a2bf677e4 100644 --- a/perf/reader_test.go +++ b/perf/reader_test.go @@ -68,6 +68,77 @@ func TestReaderSetDeadline(t *testing.T) { if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) { t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err) } + + rd.SetDeadline(time.Now().Add(10 * time.Millisecond)) + if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) { + t.Error("Expected os.ErrDeadlineExceeded from third Read, got:", err) + } +} + +func TestReaderSetDeadlinePendingEvents(t *testing.T) { + events := perfEventArray(t) + + rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2}) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + outputSamples(t, events, 5) + + rd.SetDeadline(time.Now().Add(-time.Second)) + _, rem := checkRecord(t, rd) + qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining")) + + outputSamples(t, events, 5) + + // another sample should not be returned before we get ErrFlushed to indicate initial set of samples read + _, err = rd.Read() + if !errors.Is(err, os.ErrDeadlineExceeded) { + t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err) + } + + // the second sample should now be read + _, _ = checkRecord(t, rd) +} + +func TestReaderFlushPendingEvents(t *testing.T) { + testutils.LockOSThreadToSingleCPU(t) + events := perfEventArray(t) + + rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2}) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + outputSamples(t, events, 5) + + wait := make(chan int) + go func() { + wait <- 0 + _, rem := checkRecord(t, rd) + wait <- rem + }() + + <-wait + time.Sleep(10 * time.Millisecond) + err = rd.Flush() + qt.Assert(t, qt.IsNil(err)) + + rem := <-wait + qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining")) + + outputSamples(t, events, 5) + + // another sample should not be returned before we get ErrFlushed to indicate initial set of samples read + _, err = rd.Read() + if !errors.Is(err, ErrFlushed) { + t.Error("Expected ErrFlushed from second Read, got:", err) + } + + // the second sample should now be read + _, _ = checkRecord(t, rd) } func outputSamples(tb testing.TB, events *ebpf.Map, sampleSizes ...byte) { diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 82010e27b..3d3ba0ecf 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -13,9 +13,10 @@ import ( ) var ( - ErrClosed = os.ErrClosed - errEOR = errors.New("end of ring") - errBusy = errors.New("sample not committed yet") + ErrClosed = os.ErrClosed + ErrFlushed = epoll.ErrFlushed + errEOR = errors.New("end of ring") + errBusy = errors.New("sample not committed yet") ) // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c @@ -55,6 +56,7 @@ type Reader struct { haveData bool deadline time.Time bufferSize int + pendingErr error } // NewReader creates a new BPF ringbuf reader. @@ -127,9 +129,13 @@ func (r *Reader) SetDeadline(t time.Time) { // Read the next record from the BPF ringbuf. // -// Returns os.ErrClosed if Close is called on the Reader, or os.ErrDeadlineExceeded -// if a deadline was set and no valid entry was present. A producer might use BPF_RB_NO_WAKEUP -// which may cause the deadline to expire but a valid entry will be present. +// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush] +// makes it return all records currently in the ring buffer, followed by [ErrFlushed]. +// +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. +// +// See [ReadInto] for a more efficient version of this method. func (r *Reader) Read() (Record, error) { var rec Record return rec, r.ReadInto(&rec) @@ -146,13 +152,18 @@ func (r *Reader) ReadInto(rec *Record) error { for { if !r.haveData { - _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline) - if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() { - // Ignoring this for reading a valid entry after timeout - // This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP - err = nil + if pe := r.pendingErr; pe != nil { + r.pendingErr = nil + return pe } - if err != nil { + + _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline) + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { + // Ignoring this for reading a valid entry after timeout or flush. + // This can occur if the producer submitted to the ring buffer + // with BPF_RB_NO_WAKEUP. + r.pendingErr = err + } else if err != nil { return err } r.haveData = true @@ -178,3 +189,9 @@ func (r *Reader) ReadInto(rec *Record) error { func (r *Reader) BufferSize() int { return r.bufferSize } + +// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point, +// until you receive a ErrFlushed error. +func (r *Reader) Flush() error { + return r.poller.Flush() +} diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 131336802..a35f1be92 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -7,13 +7,15 @@ import ( "testing" "time" + "github.com/go-quicktest/qt" + "github.com/google/go-cmp/cmp" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/asm" "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/testutils" "github.com/cilium/ebpf/internal/testutils/fdtrace" "github.com/cilium/ebpf/internal/unix" - "github.com/google/go-cmp/cmp" ) type sampleMessage struct { @@ -284,7 +286,7 @@ func TestReaderNoWakeup(t *testing.T) { t.Error("Expected no error from first Read, got:", err) } if len(record.RawSample) != 5 { - t.Errorf("Expected to read 5 bytes bot got %d", len(record.RawSample)) + t.Errorf("Expected to read 5 bytes but got %d", len(record.RawSample)) } record, err = rd.Read() @@ -293,7 +295,72 @@ func TestReaderNoWakeup(t *testing.T) { t.Error("Expected no error from second Read, got:", err) } if len(record.RawSample) != 7 { - t.Errorf("Expected to read 7 bytes bot got %d", len(record.RawSample)) + t.Errorf("Expected to read 7 bytes but got %d", len(record.RawSample)) + } + + _, err = rd.Read() + if !errors.Is(err, os.ErrDeadlineExceeded) { + t.Errorf("Expected os.ErrDeadlineExceeded from third Read but got %v", err) + } +} + +func TestReaderFlushPendingEvents(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5, flags: unix.BPF_RB_NO_WAKEUP}, // Read after Flush + sampleMessage{size: 6, flags: unix.BPF_RB_NO_WAKEUP}, // Discard + sampleMessage{size: 7, flags: unix.BPF_RB_NO_WAKEUP}) // Read won't block + + rd, err := NewReader(events) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + ret, _, err := prog.Test(internal.EmptyBPFContext) + testutils.SkipIfNotSupported(t, err) + if err != nil { + t.Fatal(err) + } + + if errno := syscall.Errno(-int32(ret)); errno != 0 { + t.Fatal("Expected 0 as return value, got", errno) + } + + wait := make(chan *Record) + go func() { + wait <- nil + record, err := rd.Read() + qt.Assert(t, qt.IsNil(err)) + wait <- &record + }() + + <-wait + time.Sleep(10 * time.Millisecond) + err = rd.Flush() + qt.Assert(t, qt.IsNil(err)) + + waitRec := <-wait + if waitRec == nil { + t.Error("Expected to read record but got nil") + } + if waitRec != nil && len(waitRec.RawSample) != 5 { + t.Errorf("Expected to read 5 bytes but got %d", len(waitRec.RawSample)) + } + + record, err := rd.Read() + + if err != nil { + t.Error("Expected no error from second Read, got:", err) + } + if len(record.RawSample) != 7 { + t.Errorf("Expected to read 7 bytes but got %d", len(record.RawSample)) + } + + _, err = rd.Read() + if !errors.Is(err, ErrFlushed) { + t.Errorf("Expected ErrFlushed from third Read but got %v", err) } } diff --git a/ringbuf/ring.go b/ringbuf/ring.go index 4b062ce78..8f8f4bce3 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -70,13 +70,6 @@ func newRingReader(cons_ptr, prod_ptr *uint64, ring []byte) *ringReader { } } -func (rr *ringReader) isEmpty() bool { - cons := atomic.LoadUint64(rr.cons_pos) - prod := atomic.LoadUint64(rr.prod_pos) - - return prod == cons -} - // To be able to wrap around data, data pages in ring buffers are mapped twice in // a single contiguous virtual region. // Therefore the returned usable size is half the size of the mmaped region.