From 83b5a8309999bc450ae4b1c4311e5d453b675d85 Mon Sep 17 00:00:00 2001 From: Bryce Kahle Date: Mon, 3 Jun 2024 13:07:25 -0700 Subject: [PATCH] perf, ringbuf: add Flush for manual Read/ReadInto wakeup Add a method Flush which interrupts a perf or ringbuf reader and causes it to read all data from the ring. This is very similar to the logic we use to check for data in the ring when a deadline is expired. The semantics of the Read function change slightly: a caller is now guaranteed to receive an os.ErrDeadlineExceeded which wasn't the case when the ring contained data. Signed-off-by: Bryce Kahle Co-authored-by: Lorenz Bauer --- internal/epoll/poller.go | 103 ++++++++++++++++++++++++++-------- internal/epoll/poller_test.go | 24 +++++++- perf/reader.go | 35 +++++++----- perf/reader_test.go | 71 +++++++++++++++++++++++ ringbuf/reader.go | 41 ++++++++++---- ringbuf/reader_test.go | 73 +++++++++++++++++++++++- ringbuf/ring.go | 7 --- 7 files changed, 294 insertions(+), 60 deletions(-) diff --git a/internal/epoll/poller.go b/internal/epoll/poller.go index 2235553b5..ed1c3a3c8 100644 --- a/internal/epoll/poller.go +++ b/internal/epoll/poller.go @@ -1,10 +1,12 @@ package epoll import ( + "errors" "fmt" "math" "os" "runtime" + "slices" "sync" "time" @@ -12,6 +14,8 @@ import ( "github.com/cilium/ebpf/internal/unix" ) +var ErrFlushed = errors.New("data was flushed") + // Poller waits for readiness notifications from multiple file descriptors. // // The wait can be interrupted by calling Close. @@ -21,27 +25,48 @@ type Poller struct { epollMu sync.Mutex epollFd int - eventMu sync.Mutex - event *eventFd + eventMu sync.Mutex + closeEvent *eventFd + flushEvent *eventFd } -func New() (*Poller, error) { +func New() (_ *Poller, err error) { + closeFDOnError := func(fd int) { + if err != nil { + unix.Close(fd) + } + } + closeEventFDOnError := func(e *eventFd) { + if err != nil { + e.close() + } + } + epollFd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, fmt.Errorf("create epoll fd: %v", err) } + defer closeFDOnError(epollFd) p := &Poller{epollFd: epollFd} - p.event, err = newEventFd() + p.closeEvent, err = newEventFd() + if err != nil { + return nil, err + } + defer closeEventFDOnError(p.closeEvent) + + p.flushEvent, err = newEventFd() if err != nil { - unix.Close(epollFd) return nil, err } + defer closeEventFDOnError(p.flushEvent) + + if err := p.Add(p.closeEvent.raw, 0); err != nil { + return nil, fmt.Errorf("add close eventfd: %w", err) + } - if err := p.Add(p.event.raw, 0); err != nil { - unix.Close(epollFd) - p.event.close() - return nil, fmt.Errorf("add eventfd: %w", err) + if err := p.Add(p.flushEvent.raw, 0); err != nil { + return nil, fmt.Errorf("add flush eventfd: %w", err) } runtime.SetFinalizer(p, (*Poller).Close) @@ -55,8 +80,8 @@ func New() (*Poller, error) { func (p *Poller) Close() error { runtime.SetFinalizer(p, nil) - // Interrupt Wait() via the event fd if it's currently blocked. - if err := p.wakeWait(); err != nil { + // Interrupt Wait() via the closeEvent fd if it's currently blocked. + if err := p.wakeWaitForClose(); err != nil { return err } @@ -73,9 +98,14 @@ func (p *Poller) Close() error { p.epollFd = -1 } - if p.event != nil { - p.event.close() - p.event = nil + if p.closeEvent != nil { + p.closeEvent.close() + p.closeEvent = nil + } + + if p.flushEvent != nil { + p.flushEvent.close() + p.flushEvent = nil } return nil @@ -118,8 +148,11 @@ func (p *Poller) Add(fd int, id int) error { // Wait for events. // -// Returns the number of pending events or an error wrapping os.ErrClosed if -// Close is called, or os.ErrDeadlineExceeded if EpollWait timeout. +// Returns the number of pending events and any errors. +// +// - [os.ErrClosed] if interrupted by [Close]. +// - [ErrFlushed] if interrupted by [Flush]. +// - [os.ErrDeadlineExceeded] if deadline is reached. func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) { p.epollMu.Lock() defer p.epollMu.Unlock() @@ -154,16 +187,26 @@ func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error) return 0, fmt.Errorf("epoll wait: %w", os.ErrDeadlineExceeded) } - for _, event := range events[:n] { - if int(event.Fd) == p.event.raw { - // Since we don't read p.event the event is never cleared and + for i := 0; i < n; { + event := events[i] + if int(event.Fd) == p.closeEvent.raw { + // Since we don't read p.closeEvent the event is never cleared and // we'll keep getting this wakeup until Close() acquires the // lock and sets p.epollFd = -1. return 0, fmt.Errorf("epoll wait: %w", os.ErrClosed) } + if int(event.Fd) == p.flushEvent.raw { + // read event to prevent it from continuing to wake + p.flushEvent.read() + err = ErrFlushed + events = slices.Delete(events, i, i+1) + n -= 1 + continue + } + i++ } - return n, nil + return n, err } } @@ -171,16 +214,28 @@ type temporaryError interface { Temporary() bool } -// wakeWait unblocks Wait if it's epoll_wait. -func (p *Poller) wakeWait() error { +// wakeWaitForClose unblocks Wait if it's epoll_wait. +func (p *Poller) wakeWaitForClose() error { + p.eventMu.Lock() + defer p.eventMu.Unlock() + + if p.closeEvent == nil { + return fmt.Errorf("epoll wake: %w", os.ErrClosed) + } + + return p.closeEvent.add(1) +} + +// Flush unblocks Wait if it's epoll_wait, for purposes of reading pending samples +func (p *Poller) Flush() error { p.eventMu.Lock() defer p.eventMu.Unlock() - if p.event == nil { + if p.flushEvent == nil { return fmt.Errorf("epoll wake: %w", os.ErrClosed) } - return p.event.add(1) + return p.flushEvent.add(1) } // eventFd wraps a Linux eventfd. diff --git a/internal/epoll/poller_test.go b/internal/epoll/poller_test.go index 3641fb141..98f949419 100644 --- a/internal/epoll/poller_test.go +++ b/internal/epoll/poller_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/cilium/ebpf/internal/unix" + "github.com/go-quicktest/qt" ) func TestPoller(t *testing.T) { @@ -101,12 +102,33 @@ func TestPollerDeadline(t *testing.T) { }() // Wait for the goroutine to enter the syscall. - time.Sleep(time.Second) + time.Sleep(500 * time.Microsecond) poller.Close() <-done } +func TestPollerFlush(t *testing.T) { + t.Parallel() + + _, poller := mustNewPoller(t) + events := make([]unix.EpollEvent, 1) + + done := make(chan struct{}) + go func() { + defer close(done) + + _, err := poller.Wait(events, time.Time{}) + qt.Check(t, qt.ErrorIs(err, ErrFlushed)) + }() + + // Wait for the goroutine to enter the syscall. + time.Sleep(500 * time.Microsecond) + + poller.Flush() + <-done +} + func mustNewPoller(t *testing.T) (*eventFd, *Poller) { t.Helper() diff --git a/perf/reader.go b/perf/reader.go index 51ad6ced5..4d2257a1a 100644 --- a/perf/reader.go +++ b/perf/reader.go @@ -18,8 +18,9 @@ import ( ) var ( - ErrClosed = os.ErrClosed - errEOR = errors.New("end of ring") + ErrClosed = os.ErrClosed + ErrFlushed = epoll.ErrFlushed + errEOR = errors.New("end of ring") ) var perfEventHeaderSize = binary.Size(perfEventHeader{}) @@ -160,6 +161,8 @@ type Reader struct { overwritable bool bufferSize int + + pendingErr error } // ReaderOptions control the behaviour of the user @@ -318,18 +321,18 @@ func (pr *Reader) SetDeadline(t time.Time) { // Read the next record from the perf ring buffer. // -// The function blocks until there are at least Watermark bytes in one +// The method blocks until there are at least Watermark bytes in one // of the per CPU buffers. Records from buffers below the Watermark // are not returned. // // Records can contain between 0 and 7 bytes of trailing garbage from the ring // depending on the input sample's length. // -// Calling Close interrupts the function. +// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush] +// makes it return all records currently in the ring buffer, followed by [ErrFlushed]. // -// Returns [os.ErrDeadlineExceeded] if a deadline was set and the perf ring buffer -// was empty. Otherwise returns a record and no error, even if the deadline was -// exceeded. +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. // // See [Reader.ReadInto] for a more efficient version of this method. func (pr *Reader) Read() (Record, error) { @@ -356,13 +359,13 @@ func (pr *Reader) ReadInto(rec *Record) error { return fmt.Errorf("perf ringbuffer: %w", ErrClosed) } - deadlineWasExceeded := false for { if len(pr.epollRings) == 0 { - if deadlineWasExceeded { - // All rings were empty when the deadline expired, return + if pe := pr.pendingErr; pe != nil { + // All rings have been emptied since the error occurred, return // appropriate error. - return os.ErrDeadlineExceeded + pr.pendingErr = nil + return pe } // NB: The deferred pauseMu.Unlock will panic if Wait panics, which @@ -371,10 +374,10 @@ func (pr *Reader) ReadInto(rec *Record) error { _, err := pr.poller.Wait(pr.epollEvents, pr.deadline) pr.pauseMu.Lock() - if errors.Is(err, os.ErrDeadlineExceeded) { + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { // We've hit the deadline, check whether there is any data in // the rings that we've not been woken up for. - deadlineWasExceeded = true + pr.pendingErr = err } else if err != nil { return err } @@ -463,6 +466,12 @@ func (pr *Reader) BufferSize() int { return pr.bufferSize } +// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point, +// until you receive a [ErrFlushed] error. +func (pr *Reader) Flush() error { + return pr.poller.Flush() +} + // NB: Has to be preceded by a call to ring.loadHead. func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error { defer ring.writeTail() diff --git a/perf/reader_test.go b/perf/reader_test.go index 39eac2653..a2bf677e4 100644 --- a/perf/reader_test.go +++ b/perf/reader_test.go @@ -68,6 +68,77 @@ func TestReaderSetDeadline(t *testing.T) { if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) { t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err) } + + rd.SetDeadline(time.Now().Add(10 * time.Millisecond)) + if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) { + t.Error("Expected os.ErrDeadlineExceeded from third Read, got:", err) + } +} + +func TestReaderSetDeadlinePendingEvents(t *testing.T) { + events := perfEventArray(t) + + rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2}) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + outputSamples(t, events, 5) + + rd.SetDeadline(time.Now().Add(-time.Second)) + _, rem := checkRecord(t, rd) + qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining")) + + outputSamples(t, events, 5) + + // another sample should not be returned before we get ErrFlushed to indicate initial set of samples read + _, err = rd.Read() + if !errors.Is(err, os.ErrDeadlineExceeded) { + t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err) + } + + // the second sample should now be read + _, _ = checkRecord(t, rd) +} + +func TestReaderFlushPendingEvents(t *testing.T) { + testutils.LockOSThreadToSingleCPU(t) + events := perfEventArray(t) + + rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2}) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + outputSamples(t, events, 5) + + wait := make(chan int) + go func() { + wait <- 0 + _, rem := checkRecord(t, rd) + wait <- rem + }() + + <-wait + time.Sleep(10 * time.Millisecond) + err = rd.Flush() + qt.Assert(t, qt.IsNil(err)) + + rem := <-wait + qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining")) + + outputSamples(t, events, 5) + + // another sample should not be returned before we get ErrFlushed to indicate initial set of samples read + _, err = rd.Read() + if !errors.Is(err, ErrFlushed) { + t.Error("Expected ErrFlushed from second Read, got:", err) + } + + // the second sample should now be read + _, _ = checkRecord(t, rd) } func outputSamples(tb testing.TB, events *ebpf.Map, sampleSizes ...byte) { diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 82010e27b..3d3ba0ecf 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -13,9 +13,10 @@ import ( ) var ( - ErrClosed = os.ErrClosed - errEOR = errors.New("end of ring") - errBusy = errors.New("sample not committed yet") + ErrClosed = os.ErrClosed + ErrFlushed = epoll.ErrFlushed + errEOR = errors.New("end of ring") + errBusy = errors.New("sample not committed yet") ) // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c @@ -55,6 +56,7 @@ type Reader struct { haveData bool deadline time.Time bufferSize int + pendingErr error } // NewReader creates a new BPF ringbuf reader. @@ -127,9 +129,13 @@ func (r *Reader) SetDeadline(t time.Time) { // Read the next record from the BPF ringbuf. // -// Returns os.ErrClosed if Close is called on the Reader, or os.ErrDeadlineExceeded -// if a deadline was set and no valid entry was present. A producer might use BPF_RB_NO_WAKEUP -// which may cause the deadline to expire but a valid entry will be present. +// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush] +// makes it return all records currently in the ring buffer, followed by [ErrFlushed]. +// +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. +// +// See [ReadInto] for a more efficient version of this method. func (r *Reader) Read() (Record, error) { var rec Record return rec, r.ReadInto(&rec) @@ -146,13 +152,18 @@ func (r *Reader) ReadInto(rec *Record) error { for { if !r.haveData { - _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline) - if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() { - // Ignoring this for reading a valid entry after timeout - // This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP - err = nil + if pe := r.pendingErr; pe != nil { + r.pendingErr = nil + return pe } - if err != nil { + + _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline) + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { + // Ignoring this for reading a valid entry after timeout or flush. + // This can occur if the producer submitted to the ring buffer + // with BPF_RB_NO_WAKEUP. + r.pendingErr = err + } else if err != nil { return err } r.haveData = true @@ -178,3 +189,9 @@ func (r *Reader) ReadInto(rec *Record) error { func (r *Reader) BufferSize() int { return r.bufferSize } + +// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point, +// until you receive a ErrFlushed error. +func (r *Reader) Flush() error { + return r.poller.Flush() +} diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 131336802..a35f1be92 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -7,13 +7,15 @@ import ( "testing" "time" + "github.com/go-quicktest/qt" + "github.com/google/go-cmp/cmp" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/asm" "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/testutils" "github.com/cilium/ebpf/internal/testutils/fdtrace" "github.com/cilium/ebpf/internal/unix" - "github.com/google/go-cmp/cmp" ) type sampleMessage struct { @@ -284,7 +286,7 @@ func TestReaderNoWakeup(t *testing.T) { t.Error("Expected no error from first Read, got:", err) } if len(record.RawSample) != 5 { - t.Errorf("Expected to read 5 bytes bot got %d", len(record.RawSample)) + t.Errorf("Expected to read 5 bytes but got %d", len(record.RawSample)) } record, err = rd.Read() @@ -293,7 +295,72 @@ func TestReaderNoWakeup(t *testing.T) { t.Error("Expected no error from second Read, got:", err) } if len(record.RawSample) != 7 { - t.Errorf("Expected to read 7 bytes bot got %d", len(record.RawSample)) + t.Errorf("Expected to read 7 bytes but got %d", len(record.RawSample)) + } + + _, err = rd.Read() + if !errors.Is(err, os.ErrDeadlineExceeded) { + t.Errorf("Expected os.ErrDeadlineExceeded from third Read but got %v", err) + } +} + +func TestReaderFlushPendingEvents(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5, flags: unix.BPF_RB_NO_WAKEUP}, // Read after Flush + sampleMessage{size: 6, flags: unix.BPF_RB_NO_WAKEUP}, // Discard + sampleMessage{size: 7, flags: unix.BPF_RB_NO_WAKEUP}) // Read won't block + + rd, err := NewReader(events) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + ret, _, err := prog.Test(internal.EmptyBPFContext) + testutils.SkipIfNotSupported(t, err) + if err != nil { + t.Fatal(err) + } + + if errno := syscall.Errno(-int32(ret)); errno != 0 { + t.Fatal("Expected 0 as return value, got", errno) + } + + wait := make(chan *Record) + go func() { + wait <- nil + record, err := rd.Read() + qt.Assert(t, qt.IsNil(err)) + wait <- &record + }() + + <-wait + time.Sleep(10 * time.Millisecond) + err = rd.Flush() + qt.Assert(t, qt.IsNil(err)) + + waitRec := <-wait + if waitRec == nil { + t.Error("Expected to read record but got nil") + } + if waitRec != nil && len(waitRec.RawSample) != 5 { + t.Errorf("Expected to read 5 bytes but got %d", len(waitRec.RawSample)) + } + + record, err := rd.Read() + + if err != nil { + t.Error("Expected no error from second Read, got:", err) + } + if len(record.RawSample) != 7 { + t.Errorf("Expected to read 7 bytes but got %d", len(record.RawSample)) + } + + _, err = rd.Read() + if !errors.Is(err, ErrFlushed) { + t.Errorf("Expected ErrFlushed from third Read but got %v", err) } } diff --git a/ringbuf/ring.go b/ringbuf/ring.go index 4b062ce78..8f8f4bce3 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -70,13 +70,6 @@ func newRingReader(cons_ptr, prod_ptr *uint64, ring []byte) *ringReader { } } -func (rr *ringReader) isEmpty() bool { - cons := atomic.LoadUint64(rr.cons_pos) - prod := atomic.LoadUint64(rr.prod_pos) - - return prod == cons -} - // To be able to wrap around data, data pages in ring buffers are mapped twice in // a single contiguous virtual region. // Therefore the returned usable size is half the size of the mmaped region.