Skip to content

Commit

Permalink
add Flush for manual Read/ReadInto wakeup
Browse files Browse the repository at this point in the history
Signed-off-by: Bryce Kahle <bryce.kahle@datadoghq.com>
  • Loading branch information
brycekahle committed Jun 18, 2024
1 parent b8dc0ee commit 88a6afe
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 30 deletions.
79 changes: 61 additions & 18 deletions internal/epoll/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"os"
"runtime"
"slices"
"sync"
"time"

Expand All @@ -21,8 +22,9 @@ type Poller struct {
epollMu sync.Mutex
epollFd int

eventMu sync.Mutex
event *eventFd
eventMu sync.Mutex
closeEvent *eventFd
flushEvent *eventFd
}

func New() (*Poller, error) {
Expand All @@ -32,16 +34,31 @@ func New() (*Poller, error) {
}

p := &Poller{epollFd: epollFd}
p.event, err = newEventFd()
p.closeEvent, err = newEventFd()
if err != nil {
unix.Close(epollFd)
return nil, err
}

if err := p.Add(p.event.raw, 0); err != nil {
p.flushEvent, err = newEventFd()
if err != nil {
p.closeEvent.close()
unix.Close(epollFd)
return nil, err
}

if err := p.Add(p.closeEvent.raw, 0); err != nil {
unix.Close(epollFd)
p.event.close()
return nil, fmt.Errorf("add eventfd: %w", err)
p.closeEvent.close()
p.flushEvent.close()
return nil, fmt.Errorf("add close eventfd: %w", err)
}

if err := p.Add(p.flushEvent.raw, 0); err != nil {
unix.Close(epollFd)
p.closeEvent.close()
p.flushEvent.close()
return nil, fmt.Errorf("add flush eventfd: %w", err)
}

runtime.SetFinalizer(p, (*Poller).Close)
Expand All @@ -55,8 +72,8 @@ func New() (*Poller, error) {
func (p *Poller) Close() error {
runtime.SetFinalizer(p, nil)

// Interrupt Wait() via the event fd if it's currently blocked.
if err := p.wakeWait(); err != nil {
// Interrupt Wait() via the closeEvent fd if it's currently blocked.
if err := p.wakeWaitForClose(); err != nil {
return err
}

Expand All @@ -73,9 +90,14 @@ func (p *Poller) Close() error {
p.epollFd = -1
}

if p.event != nil {
p.event.close()
p.event = nil
if p.closeEvent != nil {
p.closeEvent.close()
p.closeEvent = nil
}

if p.flushEvent != nil {
p.flushEvent.close()
p.flushEvent = nil
}

return nil
Expand Down Expand Up @@ -154,13 +176,22 @@ func (p *Poller) Wait(events []unix.EpollEvent, deadline time.Time) (int, error)
return 0, fmt.Errorf("epoll wait: %w", os.ErrDeadlineExceeded)
}

for _, event := range events[:n] {
if int(event.Fd) == p.event.raw {
// Since we don't read p.event the event is never cleared and
for i := 0; i < n; {
event := events[i]
if int(event.Fd) == p.closeEvent.raw {
// Since we don't read p.closeEvent the event is never cleared and
// we'll keep getting this wakeup until Close() acquires the
// lock and sets p.epollFd = -1.
return 0, fmt.Errorf("epoll wait: %w", os.ErrClosed)
}
if int(event.Fd) == p.flushEvent.raw {
// read event to prevent it from continuing to wake
p.flushEvent.read()
events = slices.Delete(events, i, i+1)
n -= 1
continue
}
i++
}

return n, nil
Expand All @@ -171,16 +202,28 @@ type temporaryError interface {
Temporary() bool
}

// wakeWait unblocks Wait if it's epoll_wait.
func (p *Poller) wakeWait() error {
// wakeWaitForClose unblocks Wait if it's epoll_wait.
func (p *Poller) wakeWaitForClose() error {
p.eventMu.Lock()
defer p.eventMu.Unlock()

if p.closeEvent == nil {
return fmt.Errorf("epoll wake: %w", os.ErrClosed)
}

return p.closeEvent.add(1)
}

// Flush unblocks Wait if it's epoll_wait, for purposes of reading pending samples
func (p *Poller) Flush() error {
p.eventMu.Lock()
defer p.eventMu.Unlock()

if p.event == nil {
if p.flushEvent == nil {
return fmt.Errorf("epoll wake: %w", os.ErrClosed)
}

return p.event.add(1)
return p.flushEvent.add(1)
}

// eventFd wraps a Linux eventfd.
Expand Down
25 changes: 25 additions & 0 deletions internal/epoll/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ func TestPoller(t *testing.T) {
t.Fatal(err)
}

go func() {
defer func() {
done <- struct{}{}
}()

events := make([]unix.EpollEvent, 1)

n, err := poller.Wait(events, time.Time{})
if err != nil {
t.Error("error from Wait:", err)
return
}
if n != 0 {
t.Errorf("got %d instead of 0 events", n)
}
}()
if err := poller.Flush(); err != nil {
t.Fatal("Flush returns an error:", err)
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timed out")
}

go read()
select {
case <-done:
Expand Down
15 changes: 15 additions & 0 deletions internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,18 @@ func (le *VerifierError) Format(f fmt.State, verb rune) {
fmt.Fprintf(f, "%%!%c(BADVERB)", verb)
}
}

type FlushCompleteError struct {
Err error
}

func (fe *FlushCompleteError) Error() string {
if fe.Err == nil {
return "flush complete"
}
return fmt.Sprintf("flush complete: %s", fe.Err.Error())
}

func (fe *FlushCompleteError) Unwrap() error {
return fe.Err
}
22 changes: 17 additions & 5 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (
var (
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
errFlush = errors.New("ring flush")

Check failure on line 23 in perf/reader.go

View workflow job for this annotation

GitHub Actions / Build and Lint

var `errFlush` is unused (unused)
)

type FlushCompleteError = internal.FlushCompleteError

var perfEventHeaderSize = binary.Size(perfEventHeader{})

// perfEventHeader must match 'struct perf_event_header` in <linux/perf_event.h>.
Expand Down Expand Up @@ -160,6 +163,8 @@ type Reader struct {
overwritable bool

bufferSize int

pendingErr error
}

// ReaderOptions control the behaviour of the user
Expand Down Expand Up @@ -356,13 +361,13 @@ func (pr *Reader) ReadInto(rec *Record) error {
return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
}

deadlineWasExceeded := false
for {
if len(pr.epollRings) == 0 {
if deadlineWasExceeded {
// All rings were empty when the deadline expired, return
if pe := pr.pendingErr; pe != nil {
// All rings have been emptied since the error occurred, return
// appropriate error.
return os.ErrDeadlineExceeded
pr.pendingErr = nil
return pe
}

// NB: The deferred pauseMu.Unlock will panic if Wait panics, which
Expand All @@ -374,7 +379,7 @@ func (pr *Reader) ReadInto(rec *Record) error {
if errors.Is(err, os.ErrDeadlineExceeded) {
// We've hit the deadline, check whether there is any data in
// the rings that we've not been woken up for.
deadlineWasExceeded = true
pr.pendingErr = &FlushCompleteError{Err: os.ErrDeadlineExceeded}
} else if err != nil {
return err
}
Expand Down Expand Up @@ -463,6 +468,13 @@ func (pr *Reader) BufferSize() int {
return pr.bufferSize
}

// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a FlushCompleteError error.
func (pr *Reader) Flush() error {
pr.pendingErr = &FlushCompleteError{}
return pr.poller.Flush()
}

// NB: Has to be preceded by a call to ring.loadHead.
func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
defer ring.writeTail()
Expand Down
76 changes: 76 additions & 0 deletions perf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,82 @@ func TestReaderSetDeadline(t *testing.T) {
if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) {
t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err)
}

rd.SetDeadline(time.Now().Add(10 * time.Millisecond))
if _, err := rd.Read(); !errors.Is(err, os.ErrDeadlineExceeded) {
t.Error("Expected os.ErrDeadlineExceeded from third Read, got:", err)
}
}

func TestReaderSetDeadlinePendingEvents(t *testing.T) {
events := perfEventArray(t)

rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2})
if err != nil {
t.Fatal(err)
}
defer rd.Close()

outputSamples(t, events, 5)

rd.SetDeadline(time.Now().Add(-time.Second))
_, rem := checkRecord(t, rd)
qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining"))

outputSamples(t, events, 5)

// another sample should not be returned before we get FlushCompleteError to indicate initial set of samples read
var fe *FlushCompleteError
_, err = rd.Read()
if !errors.As(err, &fe) {
t.Error("Expected FlushCompleteError from second Read, got:", err)
}
if !errors.Is(err, os.ErrDeadlineExceeded) {
t.Error("Expected os.ErrDeadlineExceeded from second Read, got:", err)
}

// the second sample should now be read
_, rem = checkRecord(t, rd)

Check failure on line 106 in perf/reader_test.go

View workflow job for this annotation

GitHub Actions / Build and Lint

ineffectual assignment to rem (ineffassign)
}

func TestReaderFlushPendingEvents(t *testing.T) {
testutils.LockOSThreadToSingleCPU(t)
events := perfEventArray(t)

rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: 2})
if err != nil {
t.Fatal(err)
}
defer rd.Close()

outputSamples(t, events, 5)

wait := make(chan int)
go func() {
wait <- 0
_, rem := checkRecord(t, rd)
wait <- rem
}()

<-wait
time.Sleep(10 * time.Millisecond)
err = rd.Flush()
qt.Assert(t, qt.IsNil(err))

rem := <-wait
qt.Assert(t, qt.Equals(rem, 0), qt.Commentf("expected zero Remaining"))

outputSamples(t, events, 5)

// another sample should not be returned before we get FlushCompleteError to indicate initial set of samples read
var fe *FlushCompleteError
_, err = rd.Read()
if !errors.As(err, &fe) {
t.Error("Expected FlushCompleteError from second Read, got:", err)
}

// the second sample should now be read
_, rem = checkRecord(t, rd)

Check failure on line 146 in perf/reader_test.go

View workflow job for this annotation

GitHub Actions / Build and Lint

ineffectual assignment to rem (ineffassign)
}

func outputSamples(tb testing.TB, events *ebpf.Map, sampleSizes ...byte) {
Expand Down
24 changes: 20 additions & 4 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/epoll"
"github.com/cilium/ebpf/internal/unix"
)
Expand All @@ -18,6 +19,8 @@ var (
errBusy = errors.New("sample not committed yet")
)

type FlushCompleteError = internal.FlushCompleteError

// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
Expand Down Expand Up @@ -55,6 +58,8 @@ type Reader struct {
haveData bool
deadline time.Time
bufferSize int

pendingErr error
}

// NewReader creates a new BPF ringbuf reader.
Expand Down Expand Up @@ -146,13 +151,17 @@ func (r *Reader) ReadInto(rec *Record) error {

for {
if !r.haveData {
if pe := r.pendingErr; pe != nil {
r.pendingErr = nil
return pe
}

_, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) && !r.ring.isEmpty() {
if errors.Is(err, os.ErrDeadlineExceeded) {
// Ignoring this for reading a valid entry after timeout
// This can occur if the producer submitted to the ring buffer with BPF_RB_NO_WAKEUP
err = nil
}
if err != nil {
r.pendingErr = &FlushCompleteError{Err: os.ErrDeadlineExceeded}
} else if err != nil {
return err
}
r.haveData = true
Expand All @@ -178,3 +187,10 @@ func (r *Reader) ReadInto(rec *Record) error {
func (r *Reader) BufferSize() int {
return r.bufferSize
}

// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a FlushCompleteError error.
func (r *Reader) Flush() error {
r.pendingErr = &FlushCompleteError{}
return r.poller.Flush()
}
Loading

0 comments on commit 88a6afe

Please sign in to comment.